maxframe.dataframe.DataFrame.mf.map_reduce#
- DataFrame.mf.map_reduce(mapper: Callable | None = None, reducer: Callable | None = None, group_cols: List[Any] | None = None, *, order_cols: List[Any] = None, ascending: bool | List[bool] = True, combiner: Callable = None, batch_rows: int | None = 1024, mapper_dtypes: Series = None, mapper_index: Index = None, mapper_batch_rows: int | None = None, reducer_dtypes: Series = None, reducer_index: Index = None, reducer_batch_rows: int | None = None, ignore_index: bool = False)#
在某些 DataFrame 上进行 Map-reduce 操作的 API。此函数大致是以下操作的快捷方式:
df.mf.apply_chunk(mapper).groupby(group_keys).mf.apply_chunk(reducer)
- 参数:
mapper (function or type) -- 映射器函数或类。
reducer (function or type) -- 归约器函数或类。
group_cols (str or list[str]) -- 映射后用于分组的键。如果未指定,则使用映射后的 DataFrame 中的所有列。
ascending (bool or list[bool] or None) -- 列是否应按升序排列,仅在指定了 order_cols 时有效。如果传入布尔值列表,则指定了 order_cols 中每个列的排序方式。
combiner (function or class) -- 组合器函数或类。应接受并返回与映射器输出相同结构的数据。
batch_rows (int or None) -- 映射器和归约器的批处理行数。如果为映射器指定了 mapper_batch_rows 或为归约器指定了 reducer_batch_rows,则忽略此参数。默认值为 1024。
mapper_dtypes (pd.Series or dict or None) -- 映射阶段的输出数据类型。
mapper_index (pd.Index or None) -- 映射器返回的 DataFrame 的索引。
mapper_batch_rows (int or None) -- 映射器的批处理行数。如果指定此参数,则忽略 batch_rows 对于映射器的设置。
reducer_dtypes (pd.Series or dict or None) -- 归约阶段的输出数据类型。
reducer_index (pd.Index or None) -- 归约器返回的 DataFrame 的索引。
reducer_batch_rows (int or None) -- 归约器的批处理行数。如果指定此参数,则忽略 batch_rows 对于归约器的设置。
ignore_index (bool) -- 如果为真,则忽略映射器或归约器函数生成的索引。
- 返回:
output -- 映射和归约操作后的结果 DataFrame。
- 返回类型:
示例
我们首先定义一个包含若干单词列的 DataFrame。
>>> from collections import defaultdict >>> import maxframe.dataframe as md >>> from maxframe.udf import with_running_options >>> df = pd.DataFrame( >>> { >>> "name": ["name key", "name", "key", "name", "key name"], >>> "id": [4, 2, 4, 3, 3], >>> "fid": [5.3, 3.5, 4.2, 2.2, 4.1], >>> } >>> )
然后我们编写一个映射器函数,该函数接受 DataFrame 中的批次数据,并返回每行中单词的计数。
>>> def mapper(batch): >>> word_to_count = defaultdict(lambda: 0) >>> for words in batch["name"]: >>> for w in words.split(): >>> word_to_count[w] += 1 >>> return pd.DataFrame( >>> [list(tp) for tp in word_to_count.items()], columns=["word", "count"] >>> )
之后我们编写一个归约器函数,用于聚合具有相同单词的记录。还可以提供运行选项,例如 CPU 规格。
>>> @with_running_options(cpu=2) >>> class TestReducer: >>> def __init__(self): >>> self._word_to_count = defaultdict(lambda: 0) >>> >>> def __call__(self, batch, end=False): >>> word = None >>> for _, row in batch.iterrows(): >>> word = row.iloc[0] >>> self._word_to_count[row.iloc[0]] += row.iloc[1] >>> if end: >>> return pd.DataFrame( >>> [[word, self._word_to_count[word]]], columns=["word", "count"] >>> ) >>> >>> def close(self): >>> # you can do several cleanups here >>> print("close")
最后,我们可以使用上面指定的映射器和归约器调用 map_reduce。
>>> res = df.mf.map_reduce( >>> mapper, >>> TestReducer, >>> group_cols=["word"], >>> mapper_dtypes={"word": "str", "count": "int"}, >>> mapper_index=pd.Index([0]), >>> reducer_dtypes={"word": "str", "count": "int"}, >>> reducer_index=pd.Index([0]), >>> ignore_index=True, >>> ) >>> res.execute().fetch() word count 0 key 3 1 name 4
参见
DataFrame.mf.apply_chunk,DataFrame.groupby.mf.apply_chunk