Source code for maxframe.dataframe.initializer

# Copyright 1999-2026 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Union

import pandas as pd
import pyarrow as pa
from pandas.api.types import is_list_like
from pandas.core.dtypes.common import pandas_dtype

from maxframe.config import options
from maxframe.core import ENTITY_TYPE
from maxframe.dataframe.core import DATAFRAME_TYPE, INDEX_TYPE, SERIES_TYPE
from maxframe.dataframe.core import DataFrame as _Frame
from maxframe.dataframe.core import Index as _Index
from maxframe.dataframe.core import Series as _Series
from maxframe.dataframe.datasource.dataframe import from_pandas as from_pandas_df
from maxframe.dataframe.datasource.from_tensor import (
    dataframe_from_1d_tileables,
    dataframe_from_tensor,
    series_from_tensor,
)
from maxframe.dataframe.datasource.index import from_pandas as from_pandas_index
from maxframe.dataframe.datasource.index import from_tileable as from_tileable_index
from maxframe.dataframe.datasource.series import from_pandas as from_pandas_series
from maxframe.dataframe.utils import is_cudf, is_index, validate_dtype_backend
from maxframe.lib.dtypes_extension import ArrowDtype
from maxframe.serialization.serializables import SerializableMeta
from maxframe.tensor import stack
from maxframe.tensor import tensor as astensor
from maxframe.tensor.array_utils import is_cupy
from maxframe.tensor.core import TENSOR_TYPE
from maxframe.utils import ceildiv, is_arrow_dtype_supported, lazy_import

cudf = lazy_import("cudf")
_ARROW_SAMPLE_SIZE = 100


def _convert_to_arrow_dtype(obj):
    """Convert pandas object to use ArrowDtype"""
    if not is_arrow_dtype_supported():
        return obj

    if isinstance(obj, pd.DataFrame):
        # Handle duplicate column names
        has_duplicates = obj.columns.duplicated().any()

        if has_duplicates:
            # For DataFrames with duplicate columns, convert each column individually
            converted_cols = []
            for i in range(len(obj.columns)):
                col_data = obj.iloc[:_ARROW_SAMPLE_SIZE, i]
                arr = pa.array(col_data)
                arrow_type = arr.type
                converted_col = col_data.astype(ArrowDtype(arrow_type))
                converted_col.name = obj.columns[i]
                converted_cols.append(converted_col)

            # Reconstruct DataFrame using concat to preserve dtypes and column names
            result_df = pd.concat(converted_cols, axis=1)
            result_df.index = obj.index
            return result_df
        else:
            # Use pyarrow to infer schema for DataFrames with unique columns
            table = pa.Table.from_pandas(obj.iloc[:_ARROW_SAMPLE_SIZE])
            # Build dtype mapping
            dtype_map = {}
            for i, col in enumerate(obj.columns):
                arrow_type = table.schema.field(i).type
                dtype_map[col] = ArrowDtype(arrow_type)
            return obj.astype(dtype_map)
    elif isinstance(obj, pd.Series):
        # Use pyarrow to infer type
        arr = pa.array(obj.iloc[:_ARROW_SAMPLE_SIZE])
        arrow_type = arr.type
        return obj.astype(ArrowDtype(arrow_type))
    elif isinstance(obj, pd.MultiIndex):
        # For MultiIndex, convert each level separately
        converted_levels = []
        for i in range(obj.nlevels):
            level_values = obj.get_level_values(i)
            arr = pa.array(level_values[:_ARROW_SAMPLE_SIZE])
            arrow_type = arr.type
            converted_level = pd.Index(level_values, name=level_values.name).astype(
                ArrowDtype(arrow_type)
            )
            converted_levels.append(converted_level)
        return pd.MultiIndex.from_arrays(converted_levels, names=obj.names)
    elif isinstance(obj, pd.Index):
        # Use pyarrow to infer type for regular Index
        arr = pa.array(obj[:_ARROW_SAMPLE_SIZE])
        arrow_type = arr.type
        return obj.astype(ArrowDtype(arrow_type))
    return obj


class InitializerMeta(SerializableMeta):
    def __instancecheck__(cls, instance):
        return isinstance(instance, (cls.__base__,) + getattr(cls, "_allow_data_type_"))


[docs] class DataFrame(_Frame, metaclass=InitializerMeta):
[docs] def __init__( self, data=None, index=None, columns=None, dtype=None, copy=False, chunk_size=None, gpu=None, sparse=None, num_partitions=None, dtype_backend=None, ): dtype_backend = validate_dtype_backend( dtype_backend or options.dataframe.dtype_backend ) need_repart = False if columns is not None and not is_list_like(columns): raise ValueError("columns must be a list-like object") if isinstance(data, TENSOR_TYPE): if chunk_size is not None: data = data.rechunk(chunk_size) df = dataframe_from_tensor( data, index=index, columns=columns, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None elif isinstance(data, SERIES_TYPE): if columns is not None and len(columns) != 1: raise ValueError("columns' length must be 1 when data is Series") col_name = columns[0] if columns else None df = data.to_frame(name=col_name) need_repart = num_partitions is not None elif isinstance(data, DATAFRAME_TYPE): if not hasattr(data, "data"): # DataFrameData df = _Frame(data) else: df = data if columns is not None: if len(df.columns) != len(columns): raise ValueError("columns' length must be equal to the data's") df.columns = columns need_repart = num_partitions is not None elif isinstance(data, dict) and self._can_process_by_1d_tileables(data): # data is a dict and some value is tensor df = dataframe_from_1d_tileables( data, index=index, columns=columns, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None elif isinstance(data, list) and any(isinstance(v, ENTITY_TYPE) for v in data): # stack data together data = stack(data) df = dataframe_from_tensor( data, index=index, columns=columns, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None elif isinstance(index, (INDEX_TYPE, SERIES_TYPE)): if isinstance(data, dict): data = {k: astensor(v, chunk_size=chunk_size) for k, v in data.items()} df = dataframe_from_1d_tileables( data, index=index, columns=columns, gpu=gpu, sparse=sparse ) else: if data is not None: data = astensor(data, chunk_size=chunk_size) df = dataframe_from_tensor( data, index=index, columns=columns, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None else: if is_cudf(data) or is_cupy(data): # pragma: no cover pdf = cudf.DataFrame(data, index=index, columns=columns, dtype=dtype) if copy: pdf = pdf.copy() else: pdf = pd.DataFrame( data, index=index, columns=columns, dtype=dtype, copy=copy ) # Apply dtype_backend conversion for pandas DataFrame if dtype_backend == "pyarrow": pdf = _convert_to_arrow_dtype(pdf) if num_partitions is not None: chunk_size = ceildiv(len(pdf), num_partitions) df = from_pandas_df(pdf, chunk_size=chunk_size, gpu=gpu, sparse=sparse) if need_repart: df = df.rebalance(num_partitions=num_partitions) super().__init__(df.data)
@classmethod def _can_process_by_1d_tileables(cls, data: dict): for value in data.values(): if isinstance(value, ENTITY_TYPE): return True elif isinstance(value, (list, tuple)) and any( isinstance(v, ENTITY_TYPE) for v in value ): return True return False
[docs] class Series(_Series, metaclass=InitializerMeta):
[docs] def __init__( self, data=None, index=None, dtype=None, name=None, copy=False, chunk_size=None, gpu=None, sparse=None, num_partitions=None, dtype_backend=None, ): dtype_backend = validate_dtype_backend( dtype_backend or options.dataframe.dtype_backend ) if dtype is not None: dtype = pandas_dtype(dtype) need_repart = False if isinstance(data, (TENSOR_TYPE, INDEX_TYPE)): if chunk_size is not None: data = data.rechunk(chunk_size) name = name or getattr(data, "name", None) series = series_from_tensor( data, index=index, name=name, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None elif isinstance(index, INDEX_TYPE): if data is not None: data = astensor(data, chunk_size=chunk_size) series = series_from_tensor( data, index=index, name=name, dtype=dtype, gpu=gpu, sparse=sparse ) need_repart = num_partitions is not None elif isinstance(data, SERIES_TYPE): if not hasattr(data, "data"): # SeriesData series = _Series(data) else: series = data need_repart = num_partitions is not None else: if is_cudf(data) or is_cupy(data): # pragma: no cover pd_series = cudf.Series(data, index=index, dtype=dtype, name=name) if copy: pd_series = pd_series.copy() else: pd_series = pd.Series( data, index=index, dtype=dtype, name=name, copy=copy ) # Apply dtype_backend conversion for pandas Series if dtype_backend == "pyarrow": pd_series = _convert_to_arrow_dtype(pd_series) if num_partitions is not None: chunk_size = ceildiv(len(pd_series), num_partitions) series = from_pandas_series( pd_series, chunk_size=chunk_size, gpu=gpu, sparse=sparse ) if need_repart: series = series.rebalance(num_partitions=num_partitions) super().__init__(series.data)
[docs] class Index(_Index, metaclass=InitializerMeta): def __new__(cls, data, **_): # just return cls always until we support other Index's initializers return object.__new__(cls)
[docs] def __init__( self, data=None, dtype=None, copy=False, name=None, tupleize_cols=True, chunk_size=None, gpu=None, sparse=None, names=None, num_partitions=None, store_data=False, dtype_backend=None, ): dtype_backend = validate_dtype_backend( dtype_backend or options.dataframe.dtype_backend ) need_repart = False if isinstance(data, INDEX_TYPE): if not hasattr(data, "data"): # IndexData index = _Index(data) else: index = data need_repart = num_partitions is not None else: if isinstance(data, ENTITY_TYPE): name = name if name is not None else getattr(data, "name", None) index = from_tileable_index(data, dtype=dtype, name=name, names=names) need_repart = num_partitions is not None else: if not is_index(data): name = name if name is not None else getattr(data, "name", None) xdf = cudf if is_cudf(data) or is_cupy(data) else pd try: pd_index = xdf.Index( data=data, dtype=dtype, copy=copy, name=name, tupleize_cols=tupleize_cols, ) except TypeError: # pragma: no cover pd_index = xdf.Index( data=data, dtype=dtype, copy=copy, name=name ) # Apply dtype_backend conversion for pandas Index if xdf is pd and dtype_backend == "pyarrow": pd_index = _convert_to_arrow_dtype(pd_index) else: pd_index = data # Apply dtype_backend conversion for existing pandas Index/MultiIndex if isinstance(pd_index, pd.Index) and dtype_backend == "pyarrow": pd_index = _convert_to_arrow_dtype(pd_index) if num_partitions is not None: chunk_size = ceildiv(len(pd_index), num_partitions) index = from_pandas_index( pd_index, chunk_size=chunk_size, gpu=gpu, sparse=sparse, store_data=store_data, ) if need_repart: index = index.rebalance(num_partitions=num_partitions) super().__init__(index.data)
_pd_type_mapping = { pd.DataFrame: DataFrame, pd.Series: Series, pd.Index: Index, }
[docs] def read_pandas( data: Union[pd.DataFrame, pd.Series, pd.Index], **kwargs ) -> Union[DataFrame, Series, Index]: """ Create MaxFrame objects from pandas. Parameters ---------- data: Union[pd.DataFrame, pd.Series, pd.Index] pandas data kwargs: dict arguments to be passed to initializers. Returns ------- result: Union[DataFrame, Series, Index] result MaxFrame object """ for pd_cls, cls in _pd_type_mapping.items(): if isinstance(data, pd_cls): return cls(data, **kwargs) raise ValueError(f"Type {type(data)} not supported")