maxframe.dataframe.groupby.DataFrameGroupBy.mf.apply_chunk#

DataFrameGroupBy.mf.apply_chunk(func: str | Callable, batch_rows=None, *, dtypes=None, dtype=None, name=None, output_type=None, index=None, skip_infer=False, order_cols=None, ascending=True, prepend_index_group_keys=False, check_output_dtypes=None, args=(), **kwargs)#

按组应用函数 func 并将结果合并。传递给函数的 pandas DataFrame 是输入 DataFrame 的一个分块,可视为一批行数据。

传递给 apply 的函数必须以一个 DataFrame 作为第一个参数,并返回一个 DataFrame、Series 或标量。apply 会负责将结果合并成一个单独的 DataFrame 或 Series。因此,apply 是一种非常灵活的分组方法。

不要期望在函数中接收到 DataFrame 的所有行,因为这取决于 MaxFrame 的实现和 MaxCompute 的内部运行状态。

参数:
  • func (callable) -- 一个可调用对象,以 DataFrame 作为第一个参数,并返回一个 DataFrame、Series 或标量。此外,该可调用对象还可以接受位置参数和关键字参数。

  • batch_rows (int) -- 指定一个批次中期望的行数,以及函数输入 DataFrame 的长度。当剩余数据不足时,实际行数可能少于该数值。

  • output_type ({'dataframe', 'series'}, default None) -- 指定返回对象的类型。详见 Notes

  • dtypes (Series, default None) -- 指定返回的 DataFrame 的数据类型。详见 Notes

  • dtype (numpy.dtype, default None) -- 指定返回的 Series 的数据类型。详见 Notes

  • name (str, default None) -- 指定返回的 Series 的名称。详见 Notes

  • index (Index, default None) -- 指定返回对象的索引。详见 Notes

  • skip_infer (bool, default False) -- 当未指定 dtypes 或 output_type 时,是否跳过推断 dtypes。一旦指定为 True,您需要通过参数或函数的类型注解显式指定 dtypes 和 output_type。

  • prepend_index_group_keys (bool, default False) -- 如果为 True,则当 group_keys=True 时,返回的 DataFrame 或 Series 的索引将自动包含组键(如果 as_index=True)或组索引(如果 as_index=False)。该选项为 True 时也会默认将分组列排除在函数输入之外,参考注意事项以获得更多信息。 .. 注意:: 在未来的版本中,prepend_index_group_keys 将默认设置为 True, 如果该参数被设置为 False,将会显示警告。为了确保您的代码在未来的版本中正常工作, 请将其设置为 True,并在索引参数或 func 的类型注解中移除组索引。

  • check_output_dtypes ({'ignore', 'warns', 'raises'}, default None) -- 输出数据类型和列的验证模式。指定时,验证用户函数是否返回具有预期数据类型的值。- 'ignore': 不执行验证 - 'warns': 验证并在不匹配时显示警告(None时的默认值)- 'raises': 验证并在不匹配时引发错误 注意:分组列会自动从验证中排除,因为它们由groupby基础架构单独管理。

  • args (tuple and dict) -- 传递给 func 的可选位置参数和关键字参数。

  • kwargs (tuple and dict) -- 传递给 func 的可选位置参数和关键字参数。

返回:

已应用

返回类型:

Series or DataFrame

参见

Series.apply

对 Series 应用一个函数。

DataFrame.apply

对 DataFrame 的每一行或每一列应用一个函数。

DataFrame.mf.apply_chunk

对 DataFrame 的行批次应用一个函数。

备注

在确定返回值的输出数据类型和形状时,MaxFrame 会尝试将 func 应用于一个模拟的分组对象,此时 apply 调用可能会失败。如果发生这种情况,则需要在 output_type 中指定 apply 调用的类型(DataFrame 或 Series)。

  • 对于 DataFrame 输出,你需要指定一个列表或 pandas Series 作为输出 DataFrame 的 dtypes

  • 对于 Series 输出,你需要指定输出 Series 的 dtypename

  • index 确定输出 DataFrame 或 Series 的索引。您可以指定一个虚拟的 pandas 索引来表示 func 输出的索引名称和类型,例如 pd.MultiIndex.from_tuples([("a", 0)], names=["key1", "key2"])。如果未提供 index,将使用输入 DataFrame 或 Series 的索引。当 prepend_index_group_keys 为 True 时,返回对象的索引将是 index 附加 groupby 带来的分组信息,这些信息随着 groupby 函数的 as_indexgroup_keys 参数变化而有所不同,具体行为与 pandas 3.0 一致。当 prepend_index_group_keys 为 False 时,您必须指定包含所有字段(包括组键)的模拟索引。由于传递完整索引定义比较复杂,prepend_index_group_keys=False 将在不久的将来被弃用。请尽可能提供 prepend_index_group_keys=True

MaxFrame 采用 pandas>=3.0 的预期行为,在用户函数输入中忽略分组列。如果你仍需要将分组列传递给函数,可以在 groupby 结果后立即选择该列,例如 df.groupby("A")[["A", "B", "C"]].mf.apply_chunk(func) 会将列 A 的数据传递给 func

batch_rows 参数控制内存使用。较大的值可能提高性能但增加OOM风险。在单个批次中处理整个分组时确保工作节点内存充足。

示例

示例1:过滤每个分组中的行#

查找每个部门中薪资超过阈值的员工。这展示了结果索引如何显示组内位置(0-n)。

>>> import maxframe.dataframe as md
>>> import pandas as pd
>>>
>>> # Create sample employee data
>>> data = {
...     'department': ['HR', 'HR', 'HR', 'IT', 'IT', 'IT', 'Finance', 'Finance'],
...     'employee_id': [1, 2, 3, 4, 5, 6, 7, 8],
...     'salary': [50000, 55000, 60000, 70000, 75000, 80000, 90000, 95000],
...     'years_experience': [2, 3, 5, 1, 4, 6, 3, 7]
... }
>>> df = md.DataFrame(data)
>>> df.execute()
  department  employee_id  salary  years_experience
0         HR            1   50000                 2
1         HR            2   55000                 3
2         HR            3   60000                 5
3         IT            4   70000                 1
4         IT            5   75000                 4
5         IT            6   80000                 6
6    Finance            7   90000                 3
7    Finance            8   95000                 7
>>> def filter_high_salary(batch_df):
...     # batch_df contains employee data for a single department
...     # Group key (department) is NOT included in the DataFrame columns
...     print(f"Processing {len(batch_df)} rows, received {batch_df}", flush=True)
...
...     # Filter: keep employees with salary > 55000
...     return batch_df[batch_df['salary'] > 55000]
>>>
>>> # Specify dtypes without the group key column (department)
>>> result_dtypes = df.dtypes[['employee_id', 'salary', 'years_experience']]
>>>
>>> result = df.groupby('department').mf.apply_chunk(
...     filter_high_salary,
...     output_type='dataframe',
...     dtypes=result_dtypes,
...     prepend_index_group_keys=True,
... )
>>> result.execute()
              employee_id  salary  years_experience
department
Finance    6            7   90000                 3
           7            8   95000                 7
HR         2            3   60000                 5
IT         3            4   70000                 1
           4            5   75000                 4
           5            6   80000                 6

结果解释:- 第一级索引("department")显示分组键值 - 第二级索引(2, 3, 4, 5, 6, 7...)是输入DataFrame的原始行索引 - 对于财务部门:原始索引为6-7的员工符合标准 - 对于人力资源部门:原始索引为2的员工符合标准 - 对于IT部门:原始索引为3-5的员工符合标准 - 分组键默认不包含在UDF输入的batch_df中,但包含在结果中 - 指定dtypes时,排除分组键列(它们是结果中的索引)

示例2:返回具有单个聚合列的DataFrame#

应用函数按部门计算平均薪资,返回具有单列和明确类型规范的DataFrame。此示例引入了``batch_rows``参数来控制批处理大小。

>>> # Specify dtypes with type annotations
>>> def calculate_avg_salary(batch_df) -> pd.DataFrame['avg_salary': 'float64']:
...     # Important: batch_df contains only non-group columns by default
...     # Group keys are not included in the UDF input
...     print(f"Processing batch with {len(batch_df)} rows")
...
...     # Return a single value as DataFrame - internal index is preserved by design
...     avg_val = batch_df['salary'].mean()
...     return pd.DataFrame({'avg_salary': [avg_val]})
>>>
>>> result = df.groupby('department').mf.apply_chunk(
...     calculate_avg_salary,
...     batch_rows=2,  # Process 2 rows per batch
...     prepend_index_group_keys=True,
... )
>>> result.execute()
              avg_salary
department
Finance    0     92500.0
HR         0     52500.0
           0     60000.0
IT         0     72500.0
           0     80000.0

结果解释:- 第一级索引("department")显示分组键值 - 第二级索引('0')是新创建的,因为每次UDF调用都返回单行DataFrame - 人力资源部门显示两行,因为batch_rows=2导致两次独立的UDF调用 - 财务和IT部门在单个批次中处理 - 当UDF返回聚合结果时,索引来自新创建的数据框

示例3:在UDF输入中包含分组键#

有时您需要在UDF中访问分组键。此示例展示如何通过显式选择分组列和其他列来包含它们。我们将过滤高薪员工,但这次在UDF输入中包含部门列。

>>> def filter_high_salary_with_dept(batch_df) -> pd.DataFrame[
...     'department': 'object', 'employee_id': 'int64', 'salary': 'float64'
... ]:
...     # Now batch_df includes the department column since we explicitly selected it
...     department = batch_df['department'].iloc[0]
...     print(f"Processing {len(batch_df)} rows for department: {department}")
...
...     # Filter: keep employees with salary > 55000 (same logic as Example 1)
...     return batch_df[batch_df['salary'] > 55000]
>>>
>>> # Include the group key by explicitly selecting it with other columns
>>> result = df.groupby('department')[['department', 'employee_id', 'salary']].mf.apply_chunk(
...     filter_high_salary_with_dept, prepend_index_group_keys=True
... )
>>> result.execute()
             department  employee_id   salary
department
Finance    6    Finance            7  90000.0
           7    Finance            8  95000.0
HR         2         HR            3  60000.0
IT         3         IT            4  70000.0
           4         IT            5  75000.0
           5         IT            6  80000.0

结果解释:- 第一级索引("department")显示分组键值 - 第二级索引(2, 3, 4, 5, 6, 7...)是输入DataFrame的原始行索引 - 通过选择['department', 'employee_id', 'salary'],我们确保部门列在UDF中可用 - UDF现在可以访问部门值(尽管在此简单过滤器中未使用)- 原始索引在结果中保留 - 过滤逻辑与示例1相同:salary > 55000

此示例演示如何通过在groupby操作中选择分组键来显式地在UDF中包含它们,使它们在函数内部可用(如有需要)。

示例 4:显式指定输出类型和索引#

当 UDF 无法在本地执行推断时,您必须通过参数或类型注解显式指定 output_type、dtypes 和 index 以确保正确执行。

>>> def create_summary_stats(batch_df):
...     # Calculate basic statistics
...     avg_salary = batch_df['salary'].mean()
...     total_salary = batch_df['salary'].sum()
...     employee_count = len(batch_df)
...
...     # Return DataFrame with correct types
...     result_df = pd.DataFrame({
...         'avg_salary': pd.Series([avg_salary], dtype='float64'),
...         'total_salary': pd.Series([total_salary], dtype='float64'),
...         'employee_count': pd.Series([employee_count], dtype='int64')
...     })
...
...     return result_df
>>>
>>> # Create inner index returned by UDF
>>> result_index = pd.Index([], dtype='int64', name='inner_index')
>>>
>>> # Explicitly specify all output parameters
>>> result = df.groupby('department').mf.apply_chunk(
...     create_summary_stats,
...     batch_rows=10000,
...     output_type='dataframe',  # specifies output type as DataFrame
...     dtypes={
...         'avg_salary': 'float64',
...         'total_salary': 'float64',
...         'employee_count': 'int'
...     }, # specifies the final dataframe column types
...     index=result_index,  # specifies the structure of the final MultiIndex result
...     prepend_index_group_keys=True,
... )
>>> result.execute()
                       avg_salary  total_salary  employee_count
department inner_index
Finance    0            92500.0      185000.0                2
HR         0            55000.0      165000.0                3
IT         0            75000.0      225000.0                3

结果解释:- 第一级索引("department")显示分组键值(字符串类型)- 第二级索引("inner_index")来自UDF返回的DataFrame(整数类型)- output_type='dataframe'告诉MaxFrame期望DataFrame输出 - dtypes定义精确的列类型以防止推断错误 - index参数指定最终MultiIndex结果的结构 - batch_rows=10000确保整个分组一起处理

为了简化输出类型定义,您也可以使用类型注解。在下面的代码片段中,pd.DataFrame 表示返回类型是具有索引名称 'inner_index' 和列 'avg_salary'、'total_salary'、'employee_count' 的 DataFrame。索引和列的类型也都有指定。

>>> def create_summary_stats(batch_df) -> pd.DataFrame[
...     {'inner_index': 'int64'},  # type of index
...     {'avg_salary': 'float64', 'total_salary': 'float64', 'employee_count': 'int64'},  # type of data
... ]:
...     # details of function omitted

关键要点:始终在以下情况下指定output_type和dtypes:1. UDF创建新的DataFrame结构 2. 本地推断可能失败 3. 您需要一致的输出格式

注意:当指定 prepend_index_group_keys=True 时,index 参数定义内部索引结构,结果索引结合了分组键(第一层,字符串)和 UDF 索引(第二层,整数)。