maxframe.dataframe.DataFrame.mf.apply_chunk#

DataFrame.mf.apply_chunk(func: str | Callable, batch_rows=None, dtypes=None, dtype=None, name=None, output_type=None, index=None, skip_infer=False, args=(), check_output_dtypes=None, **kwargs)#

应用一个函数,该函数接收 pandas DataFrame 并输出 pandas DataFrame/Series。提供给该函数的 pandas DataFrame 是输入 dataframe 的一个区块,可视为一批行数据。

传递给此函数的对象是原始 DataFrame 的切片,包含最多 batch_rows 行数和所有列。它等效于合并多个带有 axis=1 参数的 df.apply 输入,然后将它们传递给函数执行,从而在特定场景下提升性能。函数输出可以是 DataFrame 或 Series。apply_chunk 最终会将结果合并到一个新的 DataFrame 或 Series 中。

不要期望在函数中接收到 DataFrame 的所有行,因为它依赖于 MaxFrame 的实现和 MaxCompute 的内部运行状态。

参数:
  • func (str or Callable) -- 应用于 dataframe 区块的函数。

  • batch_rows (int) -- 指定批次中预期的行数,以及函数输入 dataframe 的长度。当剩余数据不足时,可能少于这个数量。

  • output_type ({'dataframe', 'series'}, default None) -- 指定返回对象的类型。详见 备注 部分。

  • dtypes (Series, default None) -- 指定返回 DataFrames 的 dtypes。详见 备注 部分。

  • dtype (numpy.dtype, default None) -- 指定返回 Series 的 dtype。详见 备注 部分。

  • name (str, default None) -- 指定返回 Series 的名称。详见 备注 部分。

  • index (Index, default None) -- 指定返回对象的索引。详见 备注 部分。

  • skip_infer (bool, default False) -- 当未指定 dtypes 或 output_type 时是否推断 dtypes。

  • check_output_dtypes (str, default None) -- 输出数据类型和列的验证模式:- 'ignore':不执行验证 - 'warns':验证并在不匹配时显示警告(None时的默认值) - 'raises':验证并在不匹配时引发错误

  • args (tuple) -- 除了数组/序列之外传递给 func 的位置参数。

  • **kwds -- 作为关键字参数传递给 func 的额外关键字参数。

返回:

对 DataFrame 给定区块应用 func 的结果。

返回类型:

Series or DataFrame

参见

DataFrame.apply

用于非批处理操作。

Series.mf.apply_chunk

对 Series 区块应用函数。

备注

当决定输出 dtypes 和返回值形状时,MaxFrame 将尝试将 func 应用到一个模拟 DataFrame 上,apply 调用可能会失败。当这种情况发生时,您需要在 output_type 中指定 apply 调用的类型(DataFrame 或 Series)。

  • 对于 DataFrame 输出,您需要指定一个列表或 pandas Series 作为输出 DataFrame 的 dtypes。也可以指定输出的 index

  • 对于 Series 输出,您需要指定输出 Series 的 dtypename

  • 对于任何数据类型为 pandas.ArrowDtype(pyarrow.MapType) 的输入,它将始终被转换为 Python dict。对于任何具有此数据类型的输出,也必须作为 Python dict 返回。

示例

>>> import numpy as np
>>> import maxframe.tensor as mt
>>> import maxframe.dataframe as md
>>> df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B'])
>>> df.execute()
   A  B
0  4  9
1  4  9
2  4  9

使用不同的 batch_rows 将收集不同的 dataframe 区块到函数中。

例如,当您使用 batch_rows=3 时,意味着函数将等待直到收集到 3 行数据。

>>> df.mf.apply_chunk(np.sum, batch_rows=3).execute()
A    12
B    27
dtype: int64

而如果 batch_rows=2,数据将被分成至少两个片段。此外,如果您的函数改变了 dataframe 的形状,可能会导致不同的输出。

>>> df.mf.apply_chunk(np.sum, batch_rows=2).execute()
A     8
B    18
A     4
B     9
dtype: int64

如果函数需要一些参数,您可以使用 args 或 kwargs 来指定它们。

>>> def calc(df, x, y):
...    return df * x + y
>>> df.mf.apply_chunk(calc, args=(10,), y=20).execute()
    A    B
0  60  110
1  60  110
2  60  110

批处理行将有利于消耗 dataframe 的操作,如 sklearn 预测。您可以轻松地在 MaxFrame 中使用 sklearn 执行离线推理,而 apply_chunk 使这个过程更加高效。@with_python_requirements 提供了自动打包和加载依赖项的功能。

一旦您依赖某些第三方依赖项,MaxFrame 可能无法正确推断返回类型。因此,使用带有 dtypedtypesoutput_type 是必要的。

>>> from maxframe.udf import with_python_requirements
>>> data = {
...     'A': np.random.rand(10),
...     'B': np.random.rand(10)
... }
>>> pd_df = pd.DataFrame(data)
>>> X = pd_df[['A']]
>>> y = pd_df['B']
>>> from sklearn.model_selection import train_test_split
>>> from sklearn.linear_model import LinearRegression
>>> model = LinearRegression()
>>> model.fit(X, y)
>>> @with_python_requirements("scikit-learn")
... def predict(df):
...     predict_B = model.predict(df[["A"]])
...     return pd.Series(predict_B, index=df.A.index)
>>> df.mf.apply_chunk(predict, batch_rows=3, output_type="series", dtype="float", name="predict_B").execute()
0   -0.765025
1   -0.765025
2   -0.765025
Name: predict_B, dtype: float64

使用字典类型创建一个DataFrame。

>>> import pyarrow as pa
>>> import pandas as pd
>>> from maxframe.lib.dtypes_extension import dict_
>>> col_a = pd.Series(
...     data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None],
...     index=[1, 2, 3],
...     dtype=dict_(pa.string(), pa.int64()),
... )
>>> col_b = pd.Series(
...     data=["A", "B", "C"],
...     index=[1, 2, 3],
... )
>>> df = md.DataFrame({"A": col_a, "B": col_b})
>>> df.execute()
                        A  B
1  [('k1', 1), ('k2', 2)]  A
2             [('k1', 3)]  B
3                    <NA>  C

定义一个函数,用于在批处理中使用新的键值对更新映射类型。

>>> def custom_set_item(df):
...     for name, value in df["A"].items():
...         if value is not None:
...             df["A"][name]["x"] = 100
...     return df
>>> mf.apply_chunk(
...     process,
...     output_type="dataframe",
...     dtypes=md_df.dtypes.copy(),
...     batch_rows=2,
...     skip_infer=True,
...     index=md_df.index,
... )
                                    A  B
1  [('k1', 1), ('k2', 2), ('x', 10))]  A
2              [('k1', 3), ('x', 10)]  B
3                                <NA>  C