maxframe.dataframe.DataFrame.mf.apply_chunk#
- DataFrame.mf.apply_chunk(func: str | Callable, batch_rows=None, dtypes=None, dtype=None, name=None, output_type=None, index=None, skip_infer=False, args=(), check_output_dtypes=None, **kwargs)#
应用一个函数,该函数接收 pandas DataFrame 并输出 pandas DataFrame/Series。提供给该函数的 pandas DataFrame 是输入 dataframe 的一个区块,可视为一批行数据。
传递给此函数的对象是原始 DataFrame 的切片,包含最多 batch_rows 行数和所有列。它等效于合并多个带有
axis=1参数的df.apply输入,然后将它们传递给函数执行,从而在特定场景下提升性能。函数输出可以是 DataFrame 或 Series。apply_chunk最终会将结果合并到一个新的 DataFrame 或 Series 中。不要期望在函数中接收到 DataFrame 的所有行,因为它依赖于 MaxFrame 的实现和 MaxCompute 的内部运行状态。
- 参数:
func (str or Callable) -- 应用于 dataframe 区块的函数。
batch_rows (int) -- 指定批次中预期的行数,以及函数输入 dataframe 的长度。当剩余数据不足时,可能少于这个数量。
output_type ({'dataframe', 'series'}, default None) -- 指定返回对象的类型。详见 备注 部分。
dtypes (Series, default None) -- 指定返回 DataFrames 的 dtypes。详见 备注 部分。
dtype (numpy.dtype, default None) -- 指定返回 Series 的 dtype。详见 备注 部分。
name (str, default None) -- 指定返回 Series 的名称。详见 备注 部分。
index (Index, default None) -- 指定返回对象的索引。详见 备注 部分。
skip_infer (bool, default False) -- 当未指定 dtypes 或 output_type 时是否推断 dtypes。
check_output_dtypes (str, default None) -- 输出数据类型和列的验证模式:- 'ignore':不执行验证 - 'warns':验证并在不匹配时显示警告(None时的默认值) - 'raises':验证并在不匹配时引发错误
args (tuple) -- 除了数组/序列之外传递给
func的位置参数。**kwds -- 作为关键字参数传递给
func的额外关键字参数。
- 返回:
对 DataFrame 给定区块应用
func的结果。- 返回类型:
参见
DataFrame.apply用于非批处理操作。
Series.mf.apply_chunk对 Series 区块应用函数。
备注
当决定输出 dtypes 和返回值形状时,MaxFrame 将尝试将
func应用到一个模拟 DataFrame 上,apply 调用可能会失败。当这种情况发生时,您需要在 output_type 中指定 apply 调用的类型(DataFrame 或 Series)。对于 DataFrame 输出,您需要指定一个列表或 pandas Series 作为输出 DataFrame 的
dtypes。也可以指定输出的index。对于 Series 输出,您需要指定输出 Series 的
dtype和name。对于任何数据类型为
pandas.ArrowDtype(pyarrow.MapType)的输入,它将始终被转换为 Python dict。对于任何具有此数据类型的输出,也必须作为 Python dict 返回。
示例
>>> import numpy as np >>> import maxframe.tensor as mt >>> import maxframe.dataframe as md >>> df = md.DataFrame([[4, 9]] * 3, columns=['A', 'B']) >>> df.execute() A B 0 4 9 1 4 9 2 4 9
使用不同的 batch_rows 将收集不同的 dataframe 区块到函数中。
例如,当您使用
batch_rows=3时,意味着函数将等待直到收集到 3 行数据。>>> df.mf.apply_chunk(np.sum, batch_rows=3).execute() A 12 B 27 dtype: int64
而如果
batch_rows=2,数据将被分成至少两个片段。此外,如果您的函数改变了 dataframe 的形状,可能会导致不同的输出。>>> df.mf.apply_chunk(np.sum, batch_rows=2).execute() A 8 B 18 A 4 B 9 dtype: int64
如果函数需要一些参数,您可以使用 args 或 kwargs 来指定它们。
>>> def calc(df, x, y): ... return df * x + y >>> df.mf.apply_chunk(calc, args=(10,), y=20).execute() A B 0 60 110 1 60 110 2 60 110
批处理行将有利于消耗 dataframe 的操作,如 sklearn 预测。您可以轻松地在 MaxFrame 中使用 sklearn 执行离线推理,而 apply_chunk 使这个过程更加高效。
@with_python_requirements提供了自动打包和加载依赖项的功能。一旦您依赖某些第三方依赖项,MaxFrame 可能无法正确推断返回类型。因此,使用带有
dtype或dtypes的output_type是必要的。>>> from maxframe.udf import with_python_requirements >>> data = { ... 'A': np.random.rand(10), ... 'B': np.random.rand(10) ... } >>> pd_df = pd.DataFrame(data) >>> X = pd_df[['A']] >>> y = pd_df['B']
>>> from sklearn.model_selection import train_test_split >>> from sklearn.linear_model import LinearRegression >>> model = LinearRegression() >>> model.fit(X, y)
>>> @with_python_requirements("scikit-learn") ... def predict(df): ... predict_B = model.predict(df[["A"]]) ... return pd.Series(predict_B, index=df.A.index)
>>> df.mf.apply_chunk(predict, batch_rows=3, output_type="series", dtype="float", name="predict_B").execute() 0 -0.765025 1 -0.765025 2 -0.765025 Name: predict_B, dtype: float64
使用字典类型创建一个DataFrame。
>>> import pyarrow as pa >>> import pandas as pd >>> from maxframe.lib.dtypes_extension import dict_ >>> col_a = pd.Series( ... data=[[("k1", 1), ("k2", 2)], [("k1", 3)], None], ... index=[1, 2, 3], ... dtype=dict_(pa.string(), pa.int64()), ... ) >>> col_b = pd.Series( ... data=["A", "B", "C"], ... index=[1, 2, 3], ... ) >>> df = md.DataFrame({"A": col_a, "B": col_b}) >>> df.execute() A B 1 [('k1', 1), ('k2', 2)] A 2 [('k1', 3)] B 3 <NA> C
定义一个函数,用于在批处理中使用新的键值对更新映射类型。
>>> def custom_set_item(df): ... for name, value in df["A"].items(): ... if value is not None: ... df["A"][name]["x"] = 100 ... return df
>>> mf.apply_chunk( ... process, ... output_type="dataframe", ... dtypes=md_df.dtypes.copy(), ... batch_rows=2, ... skip_infer=True, ... index=md_df.index, ... ) A B 1 [('k1', 1), ('k2', 2), ('x', 10))] A 2 [('k1', 3), ('x', 10)] B 3 <NA> C