maxframe.dataframe.DataFrame.mf.map_reduce#

DataFrame.mf.map_reduce(mapper: Callable | None = None, reducer: Callable | None = None, group_cols: List[Any] | None = None, *, order_cols: List[Any] = None, ascending: bool | List[bool] = True, combiner: Callable = None, batch_rows: int | None = 1024, mapper_dtypes: Series = None, mapper_index: Index = None, mapper_batch_rows: int | None = None, reducer_dtypes: Series = None, reducer_index: Index = None, reducer_batch_rows: int | None = None, ignore_index: bool = False)#

Map-reduce API over certain DataFrames. This function is roughly a shortcut for

df.mf.apply_chunk(mapper).groupby(group_keys).mf.apply_chunk(reducer)
Parameters:
  • mapper (function or type) – Mapper function or class.

  • reducer (function or type) – Reducer function or class.

  • group_cols (str or list[str]) – The keys to group after mapper. If absent, all columns in the mapped DataFrame will be used.

  • order_cols (str or list[str]) – The columns to sort after groupby.

  • ascending (bool or list[bool] or None) – Whether columns should be in ascending order or not, only effective when order_cols are specified. If a list of booleans are passed, orders of every column in order_cols are specified.

  • combiner (function or class) – Combiner function or class. Should accept and returns the same schema of mapper outputs.

  • batch_rows (int or None) – Rows in batches for mappers and reducers. Ignored if mapper_batch_rows specified for mappers or reducer_batch_rows specified for reducers. 1024 by default.

  • mapper_dtypes (pd.Series or dict or None) – Output dtypes of mapper stage.

  • mapper_index (pd.Index or None) – Index of DataFrame returned by mappers.

  • mapper_batch_rows (int or None) – Rows in batches for mappers. If specified, batch_rows will be ignored for mappers.

  • reducer_dtypes (pd.Series or dict or None) – Output dtypes of reducer stage.

  • reducer_index (pd.Index or None) – Index of DataFrame returned by reducers.

  • reducer_batch_rows (int or None) – Rows in batches for mappers. If specified, batch_rows will be ignored for reducers.

  • ignore_index (bool) – If true, indexes generated at mapper or reducer functions will be ignored.

Returns:

output – Result DataFrame after map and reduce.

Return type:

DataFrame

Examples

We first define a DataFrame with a column of several words.

>>> from collections import defaultdict
>>> import maxframe.dataframe as md
>>> from maxframe.udf import with_running_options
>>> df = pd.DataFrame(
>>>     {
>>>         "name": ["name key", "name", "key", "name", "key name"],
>>>         "id": [4, 2, 4, 3, 3],
>>>         "fid": [5.3, 3.5, 4.2, 2.2, 4.1],
>>>     }
>>> )

Then we write a mapper function which accepts batches in the DataFrame and returns counts of words in every row.

>>> def mapper(batch):
>>>     word_to_count = defaultdict(lambda: 0)
>>>     for words in batch["name"]:
>>>         for w in words.split():
>>>             word_to_count[w] += 1
>>>     return pd.DataFrame(
>>>         [list(tp) for tp in word_to_count.items()], columns=["word", "count"]
>>>     )

After that we write a reducer function which aggregates records with the same word. Running options such as CPU specifications can be supplied as well.

>>> @with_running_options(cpu=2)
>>> class TestReducer:
>>>     def __init__(self):
>>>         self._word_to_count = defaultdict(lambda: 0)
>>>
>>>     def __call__(self, batch, end=False):
>>>         word = None
>>>         for _, row in batch.iterrows():
>>>             word = row.iloc[0]
>>>             self._word_to_count[row.iloc[0]] += row.iloc[1]
>>>         if end:
>>>             return pd.DataFrame(
>>>                 [[word, self._word_to_count[word]]], columns=["word", "count"]
>>>             )
>>>
>>>     def close(self):
>>>         # you can do several cleanups here
>>>         print("close")

Finally we can call map_reduce with mappers and reducers specified above.

>>> res = df.mf.map_reduce(
>>>     mapper,
>>>     TestReducer,
>>>     group_cols=["word"],
>>>     mapper_dtypes={"word": "str", "count": "int"},
>>>     mapper_index=pd.Index([0]),
>>>     reducer_dtypes={"word": "str", "count": "int"},
>>>     reducer_index=pd.Index([0]),
>>>     ignore_index=True,
>>> )
>>> res.execute().fetch()
   word  count
0   key      3
1  name      4

See also

DataFrame.mf.apply_chunk, DataFrame.groupby.mf.apply_chunk