Source code for maxframe.dataframe.merge.concat

# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Union

import pandas as pd

from ... import opcodes
from ...core import ENTITY_TYPE, EntityData, OutputType
from ...protocol import DefaultIndexType
from ...serialization.serializables import (
    AnyField,
    BoolField,
    EnumField,
    FieldTypes,
    ListField,
    StringField,
)
from ...utils import lazy_import
from ..core import DataFrame, DataFrameIndexTypeMixin, Series
from ..operators import SERIES_TYPE, DataFrameOperator, DataFrameOperatorMixin
from ..utils import (
    build_empty_df,
    build_empty_series,
    get_index_value_by_default_index_type,
    parse_index,
    validate_axis,
    validate_default_index_type,
)

cudf = lazy_import("cudf")


class DataFrameConcat(
    DataFrameOperator, DataFrameOperatorMixin, DataFrameIndexTypeMixin
):
    _op_type_ = opcodes.CONCATENATE

    axis = AnyField("axis", default=None)
    join = StringField("join", default=None)
    ignore_index = BoolField("ignore_index", default=None)
    keys = ListField("keys", default=None)
    levels = ListField("levels", default=None)
    names = ListField("names", default=None)
    verify_integrity = BoolField("verify_integrity", default=None)
    sort = BoolField("sort", default=None)
    copy_ = BoolField("copy", default=None)
    default_index_type = EnumField(
        "default_index_type", DefaultIndexType, FieldTypes.int8, default=None
    )

    def __init__(self, copy=None, output_types=None, **kw):
        super().__init__(copy_=copy, _output_types=output_types, **kw)

    @property
    def level(self):
        return self.levels

    @property
    def name(self):
        return self.names

    @classmethod
    def _concat_index(cls, df_or_series_list: Union[List[DataFrame], List[Series]]):
        concat_index = None
        all_indexes_have_value = all(
            input.index_value.has_value() for input in df_or_series_list
        )

        def _concat(prev_index: pd.Index, cur_index: pd.Index):
            if prev_index is None:
                return cur_index

            if (
                all_indexes_have_value
                and isinstance(prev_index, pd.RangeIndex)
                and isinstance(cur_index, pd.RangeIndex)
            ):
                # handle RangeIndex that append may generate huge amount of data
                # e.g. pd.RangeIndex(10_000) and pd.RangeIndex(10_000)
                # will generate a Int64Index full of data
                # for details see GH#1647
                prev_stop = prev_index.start + prev_index.size * prev_index.step
                cur_start = cur_index.start
                if prev_stop == cur_start and prev_index.step == cur_index.step:
                    # continuous RangeIndex, still return RangeIndex
                    return prev_index.append(cur_index)
                else:
                    # otherwise, return an empty index
                    return pd.Index([], dtype=prev_index.dtype)
            elif isinstance(prev_index, pd.RangeIndex):
                return pd.Index([], prev_index.dtype).append(cur_index)
            elif isinstance(cur_index, pd.RangeIndex):
                return prev_index.append(pd.Index([], cur_index.dtype))
            return prev_index.append(cur_index)

        for input in df_or_series_list:
            concat_index = _concat(concat_index, input.index_value.to_pandas())

        return concat_index

    def _call_series(self, objs):
        if self.axis == 0:
            row_length = 0
            for series in objs:
                row_length += series.shape[0]
            if self.ignore_index:
                idx_length = 0 if pd.isna(row_length) else row_length
                index_value = get_index_value_by_default_index_type(
                    self.default_index_type, idx_length, args=(type(self), objs)
                )
            else:
                index = self._concat_index(objs)
                index_value = parse_index(index, objs)
            obj_names = {obj.name for obj in objs}
            return self.new_series(
                objs,
                shape=(row_length,),
                dtype=objs[0].dtype,
                index_value=index_value,
                name=objs[0].name if len(obj_names) == 1 else None,
            )
        else:
            col_length = 0
            columns = []
            dtypes = dict()
            undefined_name = 0
            for series in objs:
                if series.name is None:
                    dtypes[undefined_name] = series.dtype
                    undefined_name += 1
                    columns.append(undefined_name)
                else:
                    dtypes[series.name] = series.dtype
                    columns.append(series.name)
                col_length += 1
            if self.ignore_index or undefined_name == len(objs):
                columns_value = parse_index(pd.RangeIndex(col_length))
            else:
                columns_value = parse_index(pd.Index(columns), store_data=True)

            shape = (objs[0].shape[0], col_length)
            return self.new_dataframe(
                objs,
                shape=shape,
                dtypes=pd.Series(dtypes),
                index_value=objs[0].index_value,
                columns_value=columns_value,
            )

    def _call_dataframes(self, objs):
        if self.axis == 0:
            row_length = 0
            empty_dfs = []
            for df in objs:
                row_length += df.shape[0]
                if df.ndim == 2:
                    empty_dfs.append(build_empty_df(df.dtypes))
                else:
                    empty_dfs.append(build_empty_series(df.dtype, name=df.name))

            emtpy_result = pd.concat(empty_dfs, join=self.join, sort=self.sort)
            shape = (row_length, emtpy_result.shape[1])
            columns_value = parse_index(emtpy_result.columns, store_data=True)

            if self.join == "inner":
                objs = [o[list(emtpy_result.columns)] for o in objs]

            if self.ignore_index:
                idx_length = 0 if pd.isna(row_length) else row_length
                index_value = get_index_value_by_default_index_type(
                    self.default_index_type, idx_length, args=(type(self), objs)
                )
            else:
                index = self._concat_index(objs)
                index_value = parse_index(index, objs)

            new_objs = []
            for obj in objs:
                if obj.ndim != 2:
                    # series
                    new_obj = obj.to_frame().reindex(columns=emtpy_result.dtypes.index)
                else:
                    # dataframe
                    if list(obj.dtypes.index) != list(emtpy_result.dtypes.index):
                        new_obj = obj.reindex(columns=emtpy_result.dtypes.index)
                    else:
                        new_obj = obj
                new_objs.append(new_obj)

            return self.new_dataframe(
                new_objs,
                shape=shape,
                dtypes=emtpy_result.dtypes,
                index_value=index_value,
                columns_value=columns_value,
            )
        else:
            col_length = 0
            empty_dfs = []
            for df in objs:
                if df.ndim == 2:
                    # DataFrame
                    col_length += df.shape[1]
                    empty_dfs.append(build_empty_df(df.dtypes))
                else:
                    # Series
                    col_length += 1
                    empty_dfs.append(build_empty_series(df.dtype, name=df.name))

            emtpy_result = pd.concat(empty_dfs, join=self.join, axis=1, sort=True)
            if self.ignore_index:
                columns_value = parse_index(pd.RangeIndex(col_length))
            else:
                columns_value = parse_index(
                    pd.Index(emtpy_result.columns), store_data=True
                )

            if self.ignore_index or len({o.index_value.key for o in objs}) == 1:
                new_objs = [obj if obj.ndim == 2 else obj.to_frame() for obj in objs]
            else:  # pragma: no cover
                raise NotImplementedError(
                    "Does not support concat dataframes which has different index"
                )

            shape = (objs[0].shape[0], col_length)
            return self.new_dataframe(
                new_objs,
                shape=shape,
                dtypes=emtpy_result.dtypes,
                index_value=objs[0].index_value,
                columns_value=columns_value,
            )

    def __call__(self, objs):
        if all(isinstance(obj, SERIES_TYPE) for obj in objs):
            self.output_types = [OutputType.series]
            return self._call_series(objs)
        else:
            self.output_types = [OutputType.dataframe]
            return self._call_dataframes(objs)


class GroupByConcat(DataFrameOperator, DataFrameOperatorMixin):
    _op_type_ = opcodes.GROUPBY_CONCAT

    _groups = ListField("groups", FieldTypes.key)
    _groupby_params = AnyField("groupby_params")

    def __init__(self, groups=None, groupby_params=None, output_types=None, **kw):
        super().__init__(
            _groups=groups,
            _groupby_params=groupby_params,
            _output_types=output_types,
            **kw
        )

    @property
    def groups(self):
        return self._groups

    @property
    def groupby_params(self):
        return self._groupby_params

    @classmethod
    def _set_inputs(cls, op: "GroupByConcat", inputs: List[EntityData]):
        super()._set_inputs(op, inputs)
        inputs_iter = iter(op._inputs)

        new_groups = []
        for _ in op._groups:
            new_groups.append(next(inputs_iter))
        op._groups = new_groups

        if isinstance(op._groupby_params["by"], list):
            by = []
            for v in op._groupby_params["by"]:
                if isinstance(v, ENTITY_TYPE):
                    by.append(next(inputs_iter))
                else:
                    by.append(v)
            op._groupby_params["by"] = by


[docs] def concat( objs, axis=0, join="outer", ignore_index=False, keys=None, levels=None, names=None, verify_integrity=False, sort=False, copy=True, default_index_type=None, ): """ Concatenate dataframe objects along a particular axis with optional set logic along the other axes. Can also add a layer of hierarchical indexing on the concatenation axis, which may be useful if the labels are the same (or overlapping) on the passed axis number. Parameters ---------- objs : a sequence or mapping of Series or DataFrame objects If a mapping is passed, the sorted keys will be used as the `keys` argument, unless it is passed, in which case the values will be selected (see below). Any None objects will be dropped silently unless they are all None in which case a ValueError will be raised. axis : {0/'index', 1/'columns'}, default 0 The axis to concatenate along. join : {'inner', 'outer'}, default 'outer' How to handle indexes on other axis (or axes). ignore_index : bool, default False If True, do not use the index values along the concatenation axis. The resulting axis will be labeled 0, ..., n - 1. This is useful if you are concatenating objects where the concatenation axis does not have meaningful indexing information. Note the index values on the other axes are still respected in the join. keys : sequence, default None If multiple levels passed, should contain tuples. Construct hierarchical index using the passed keys as the outermost level. levels : list of sequences, default None Specific levels (unique values) to use for constructing a MultiIndex. Otherwise they will be inferred from the keys. names : list, default None Names for the levels in the resulting hierarchical index. verify_integrity : bool, default False Check whether the new concatenated axis contains duplicates. This can be very expensive relative to the actual data concatenation. sort : bool, default False Sort non-concatenation axis if it is not already aligned when `join` is 'outer'. This has no effect when ``join='inner'``, which already preserves the order of the non-concatenation axis. copy : bool, default True If False, do not copy data unnecessarily. Returns ------- object, type of objs When concatenating all ``Series`` along the index (axis=0), a ``Series`` is returned. When ``objs`` contains at least one ``DataFrame``, a ``DataFrame`` is returned. When concatenating along the columns (axis=1), a ``DataFrame`` is returned. See Also -------- Series.append : Concatenate Series. DataFrame.append : Concatenate DataFrames. DataFrame.join : Join DataFrames using indexes. DataFrame.merge : Merge DataFrames by indexes or columns. Notes ----- The keys, levels, and names arguments are all optional. A walkthrough of how this method fits in with other tools for combining pandas objects can be found `here <https://pandas.pydata.org/pandas-docs/stable/user_guide/merging.html>`__. Examples -------- Combine two ``Series``. >>> import maxframe.dataframe as md >>> s1 = md.Series(['a', 'b']) >>> s2 = md.Series(['c', 'd']) >>> md.concat([s1, s2]).execute() 0 a 1 b 0 c 1 d dtype: object Clear the existing index and reset it in the result by setting the ``ignore_index`` option to ``True``. >>> md.concat([s1, s2], ignore_index=True).execute() 0 a 1 b 2 c 3 d dtype: object Add a hierarchical index at the outermost level of the data with the ``keys`` option. >>> md.concat([s1, s2], keys=['s1', 's2']).execute() s1 0 a 1 b s2 0 c 1 d dtype: object Label the index keys you create with the ``names`` option. >>> md.concat([s1, s2], keys=['s1', 's2'], ... names=['Series name', 'Row ID']).execute() Series name Row ID s1 0 a 1 b s2 0 c 1 d dtype: object Combine two ``DataFrame`` objects with identical columns. >>> df1 = md.DataFrame([['a', 1], ['b', 2]], ... columns=['letter', 'number']) >>> df1.execute() letter number 0 a 1 1 b 2 >>> df2 = md.DataFrame([['c', 3], ['d', 4]], ... columns=['letter', 'number']) >>> df2.execute() letter number 0 c 3 1 d 4 >>> md.concat([df1, df2]).execute() letter number 0 a 1 1 b 2 0 c 3 1 d 4 Combine ``DataFrame`` objects with overlapping columns and return everything. Columns outside the intersection will be filled with ``NaN`` values. >>> df3 = md.DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']], ... columns=['letter', 'number', 'animal']) >>> df3.execute() letter number animal 0 c 3 cat 1 d 4 dog >>> md.concat([df1, df3], sort=False).execute() letter number animal 0 a 1 NaN 1 b 2 NaN 0 c 3 cat 1 d 4 dog Combine ``DataFrame`` objects with overlapping columns and return only those that are shared by passing ``inner`` to the ``join`` keyword argument. >>> md.concat([df1, df3], join="inner").execute() letter number 0 a 1 1 b 2 0 c 3 1 d 4 Combine ``DataFrame`` objects horizontally along the x axis by passing in ``axis=1``. >>> df4 = md.DataFrame([['bird', 'polly'], ['monkey', 'george']], ... columns=['animal', 'name']) >>> md.concat([df1, df4], axis=1).execute() letter number animal name 0 a 1 bird polly 1 b 2 monkey george Prevent the result from including duplicate index values with the ``verify_integrity`` option. >>> df5 = md.DataFrame([1], index=['a']) >>> df5.execute() 0 a 1 >>> df6 = md.DataFrame([2], index=['a']) >>> df6.execute() 0 a 2 """ if not isinstance(objs, (list, tuple)): # pragma: no cover raise TypeError( "first argument must be an iterable of dataframe or series objects" ) axis = validate_axis(axis) if isinstance(objs, dict): # pragma: no cover keys = objs.keys() objs = objs.values() if axis == 1 and join == "inner": # pragma: no cover raise NotImplementedError("inner join is not support when specify `axis=1`") if verify_integrity or sort or keys: # pragma: no cover raise NotImplementedError( "verify_integrity, sort, keys arguments are not supported now" ) default_index_type = validate_default_index_type(default_index_type) op = DataFrameConcat( axis=axis, join=join, ignore_index=ignore_index, keys=keys, levels=levels, names=names, verify_integrity=verify_integrity, sort=sort, copy=copy, default_index_type=default_index_type, ) return op(objs)