maxframe.dataframe.DataFrame.mf.map_reduce#
- DataFrame.mf.map_reduce(mapper: Callable | None = None, reducer: Callable | None = None, group_cols: List[Any] | None = None, *, order_cols: List[Any] = None, ascending: bool | List[bool] = True, combiner: Callable = None, batch_rows: int | None = 1024, mapper_dtypes: Series = None, mapper_index: Index = None, mapper_batch_rows: int | None = None, reducer_dtypes: Series = None, reducer_index: Index = None, reducer_batch_rows: int | None = None, ignore_index: bool = False)#
Map-reduce API over certain DataFrames. This function is roughly a shortcut for
df.mf.apply_chunk(mapper).groupby(group_keys).mf.apply_chunk(reducer)
- Parameters:
mapper (function or type) – Mapper function or class.
reducer (function or type) – Reducer function or class.
group_cols (str or list[str]) – The keys to group after mapper. If absent, all columns in the mapped DataFrame will be used.
order_cols (str or list[str]) – The columns to sort after groupby.
ascending (bool or list[bool] or None) – Whether columns should be in ascending order or not, only effective when order_cols are specified. If a list of booleans are passed, orders of every column in order_cols are specified.
combiner (function or class) – Combiner function or class. Should accept and returns the same schema of mapper outputs.
batch_rows (int or None) – Rows in batches for mappers and reducers. Ignored if mapper_batch_rows specified for mappers or reducer_batch_rows specified for reducers. 1024 by default.
mapper_dtypes (pd.Series or dict or None) – Output dtypes of mapper stage.
mapper_index (pd.Index or None) – Index of DataFrame returned by mappers.
mapper_batch_rows (int or None) – Rows in batches for mappers. If specified, batch_rows will be ignored for mappers.
reducer_dtypes (pd.Series or dict or None) – Output dtypes of reducer stage.
reducer_index (pd.Index or None) – Index of DataFrame returned by reducers.
reducer_batch_rows (int or None) – Rows in batches for mappers. If specified, batch_rows will be ignored for reducers.
ignore_index (bool) – If true, indexes generated at mapper or reducer functions will be ignored.
- Returns:
output – Result DataFrame after map and reduce.
- Return type:
Examples
We first define a DataFrame with a column of several words.
>>> from collections import defaultdict >>> import maxframe.dataframe as md >>> from maxframe.udf import with_running_options >>> df = pd.DataFrame( >>> { >>> "name": ["name key", "name", "key", "name", "key name"], >>> "id": [4, 2, 4, 3, 3], >>> "fid": [5.3, 3.5, 4.2, 2.2, 4.1], >>> } >>> )
Then we write a mapper function which accepts batches in the DataFrame and returns counts of words in every row.
>>> def mapper(batch): >>> word_to_count = defaultdict(lambda: 0) >>> for words in batch["name"]: >>> for w in words.split(): >>> word_to_count[w] += 1 >>> return pd.DataFrame( >>> [list(tp) for tp in word_to_count.items()], columns=["word", "count"] >>> )
After that we write a reducer function which aggregates records with the same word. Running options such as CPU specifications can be supplied as well.
>>> @with_running_options(cpu=2) >>> class TestReducer: >>> def __init__(self): >>> self._word_to_count = defaultdict(lambda: 0) >>> >>> def __call__(self, batch, end=False): >>> word = None >>> for _, row in batch.iterrows(): >>> word = row.iloc[0] >>> self._word_to_count[row.iloc[0]] += row.iloc[1] >>> if end: >>> return pd.DataFrame( >>> [[word, self._word_to_count[word]]], columns=["word", "count"] >>> ) >>> >>> def close(self): >>> # you can do several cleanups here >>> print("close")
Finally we can call map_reduce with mappers and reducers specified above.
>>> res = df.mf.map_reduce( >>> mapper, >>> TestReducer, >>> group_cols=["word"], >>> mapper_dtypes={"word": "str", "count": "int"}, >>> mapper_index=pd.Index([0]), >>> reducer_dtypes={"word": "str", "count": "int"}, >>> reducer_index=pd.Index([0]), >>> ignore_index=True, >>> ) >>> res.execute().fetch() word count 0 key 3 1 name 4
See also
DataFrame.mf.apply_chunk,DataFrame.groupby.mf.apply_chunk