maxframe.dataframe.DataFrame.mf.map_reduce#

DataFrame.mf.map_reduce(mapper: Callable | None = None, reducer: Callable | None = None, group_cols: List[Any] | None = None, *, order_cols: List[Any] = None, ascending: bool | List[bool] = True, combiner: Callable = None, batch_rows: int | None = 1024, mapper_dtypes: Series = None, mapper_index: Index = None, mapper_batch_rows: int | None = None, reducer_dtypes: Series = None, reducer_index: Index = None, reducer_batch_rows: int | None = None, ignore_index: bool = False)#

在某些 DataFrame 上进行 Map-reduce 操作的 API。此函数大致是以下操作的快捷方式:

df.mf.apply_chunk(mapper).groupby(group_keys).mf.apply_chunk(reducer)
参数:
  • mapper (function or type) -- 映射器函数或类。

  • reducer (function or type) -- 归约器函数或类。

  • group_cols (str or list[str]) -- 映射后用于分组的键。如果未指定,则使用映射后的 DataFrame 中的所有列。

  • order_cols (str or list[str]) -- 按 groupby 分组后要排序的列。

  • ascending (bool or list[bool] or None) -- 列是否应按升序排列,仅在指定了 order_cols 时有效。如果传入布尔值列表,则指定了 order_cols 中每个列的排序方式。

  • combiner (function or class) -- 组合器函数或类。应接受并返回与映射器输出相同结构的数据。

  • batch_rows (int or None) -- 映射器和归约器的批处理行数。如果为映射器指定了 mapper_batch_rows 或为归约器指定了 reducer_batch_rows,则忽略此参数。默认值为 1024。

  • mapper_dtypes (pd.Series or dict or None) -- 映射阶段的输出数据类型。

  • mapper_index (pd.Index or None) -- 映射器返回的 DataFrame 的索引。

  • mapper_batch_rows (int or None) -- 映射器的批处理行数。如果指定此参数,则忽略 batch_rows 对于映射器的设置。

  • reducer_dtypes (pd.Series or dict or None) -- 归约阶段的输出数据类型。

  • reducer_index (pd.Index or None) -- 归约器返回的 DataFrame 的索引。

  • reducer_batch_rows (int or None) -- 归约器的批处理行数。如果指定此参数,则忽略 batch_rows 对于归约器的设置。

  • ignore_index (bool) -- 如果为真,则忽略映射器或归约器函数生成的索引。

返回:

output -- 映射和归约操作后的结果 DataFrame。

返回类型:

DataFrame

示例

我们首先定义一个包含若干单词列的 DataFrame。

>>> from collections import defaultdict
>>> import maxframe.dataframe as md
>>> from maxframe.udf import with_running_options
>>> df = pd.DataFrame(
>>>     {
>>>         "name": ["name key", "name", "key", "name", "key name"],
>>>         "id": [4, 2, 4, 3, 3],
>>>         "fid": [5.3, 3.5, 4.2, 2.2, 4.1],
>>>     }
>>> )

然后我们编写一个映射器函数,该函数接受 DataFrame 中的批次数据,并返回每行中单词的计数。

>>> def mapper(batch):
>>>     word_to_count = defaultdict(lambda: 0)
>>>     for words in batch["name"]:
>>>         for w in words.split():
>>>             word_to_count[w] += 1
>>>     return pd.DataFrame(
>>>         [list(tp) for tp in word_to_count.items()], columns=["word", "count"]
>>>     )

之后我们编写一个归约器函数,用于聚合具有相同单词的记录。还可以提供运行选项,例如 CPU 规格。

>>> @with_running_options(cpu=2)
>>> class TestReducer:
>>>     def __init__(self):
>>>         self._word_to_count = defaultdict(lambda: 0)
>>>
>>>     def __call__(self, batch, end=False):
>>>         word = None
>>>         for _, row in batch.iterrows():
>>>             word = row.iloc[0]
>>>             self._word_to_count[row.iloc[0]] += row.iloc[1]
>>>         if end:
>>>             return pd.DataFrame(
>>>                 [[word, self._word_to_count[word]]], columns=["word", "count"]
>>>             )
>>>
>>>     def close(self):
>>>         # you can do several cleanups here
>>>         print("close")

最后,我们可以使用上面指定的映射器和归约器调用 map_reduce

>>> res = df.mf.map_reduce(
>>>     mapper,
>>>     TestReducer,
>>>     group_cols=["word"],
>>>     mapper_dtypes={"word": "str", "count": "int"},
>>>     mapper_index=pd.Index([0]),
>>>     reducer_dtypes={"word": "str", "count": "int"},
>>>     reducer_index=pd.Index([0]),
>>>     ignore_index=True,
>>> )
>>> res.execute().fetch()
   word  count
0   key      3
1  name      4

参见

DataFrame.mf.apply_chunk, DataFrame.groupby.mf.apply_chunk