# Copyright 1999-2026 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
import pandas as pd
import pyarrow as pa
from pandas.api.types import is_list_like
from pandas.core.dtypes.common import pandas_dtype
from maxframe.config import options
from maxframe.core import ENTITY_TYPE
from maxframe.dataframe.core import DATAFRAME_TYPE, INDEX_TYPE, SERIES_TYPE
from maxframe.dataframe.core import DataFrame as _Frame
from maxframe.dataframe.core import Index as _Index
from maxframe.dataframe.core import Series as _Series
from maxframe.dataframe.datasource.dataframe import from_pandas as from_pandas_df
from maxframe.dataframe.datasource.from_tensor import (
dataframe_from_1d_tileables,
dataframe_from_tensor,
series_from_tensor,
)
from maxframe.dataframe.datasource.index import from_pandas as from_pandas_index
from maxframe.dataframe.datasource.index import from_tileable as from_tileable_index
from maxframe.dataframe.datasource.series import from_pandas as from_pandas_series
from maxframe.dataframe.utils import is_cudf, is_index, validate_dtype_backend
from maxframe.lib.dtypes_extension import ArrowDtype
from maxframe.serialization.serializables import SerializableMeta
from maxframe.tensor import stack
from maxframe.tensor import tensor as astensor
from maxframe.tensor.array_utils import is_cupy
from maxframe.tensor.core import TENSOR_TYPE
from maxframe.utils import ceildiv, is_arrow_dtype_supported, lazy_import
cudf = lazy_import("cudf")
_ARROW_SAMPLE_SIZE = 100
def _convert_to_arrow_dtype(obj):
"""Convert pandas object to use ArrowDtype"""
if not is_arrow_dtype_supported():
return obj
if isinstance(obj, pd.DataFrame):
# Handle duplicate column names
has_duplicates = obj.columns.duplicated().any()
if has_duplicates:
# For DataFrames with duplicate columns, convert each column individually
converted_cols = []
for i in range(len(obj.columns)):
col_data = obj.iloc[:_ARROW_SAMPLE_SIZE, i]
arr = pa.array(col_data)
arrow_type = arr.type
converted_col = col_data.astype(ArrowDtype(arrow_type))
converted_col.name = obj.columns[i]
converted_cols.append(converted_col)
# Reconstruct DataFrame using concat to preserve dtypes and column names
result_df = pd.concat(converted_cols, axis=1)
result_df.index = obj.index
return result_df
else:
# Use pyarrow to infer schema for DataFrames with unique columns
table = pa.Table.from_pandas(obj.iloc[:_ARROW_SAMPLE_SIZE])
# Build dtype mapping
dtype_map = {}
for i, col in enumerate(obj.columns):
arrow_type = table.schema.field(i).type
dtype_map[col] = ArrowDtype(arrow_type)
return obj.astype(dtype_map)
elif isinstance(obj, pd.Series):
# Use pyarrow to infer type
arr = pa.array(obj.iloc[:_ARROW_SAMPLE_SIZE])
arrow_type = arr.type
return obj.astype(ArrowDtype(arrow_type))
elif isinstance(obj, pd.MultiIndex):
# For MultiIndex, convert each level separately
converted_levels = []
for i in range(obj.nlevels):
level_values = obj.get_level_values(i)
arr = pa.array(level_values[:_ARROW_SAMPLE_SIZE])
arrow_type = arr.type
converted_level = pd.Index(level_values, name=level_values.name).astype(
ArrowDtype(arrow_type)
)
converted_levels.append(converted_level)
return pd.MultiIndex.from_arrays(converted_levels, names=obj.names)
elif isinstance(obj, pd.Index):
# Use pyarrow to infer type for regular Index
arr = pa.array(obj[:_ARROW_SAMPLE_SIZE])
arrow_type = arr.type
return obj.astype(ArrowDtype(arrow_type))
return obj
class InitializerMeta(SerializableMeta):
def __instancecheck__(cls, instance):
return isinstance(instance, (cls.__base__,) + getattr(cls, "_allow_data_type_"))
[docs]
class DataFrame(_Frame, metaclass=InitializerMeta):
[docs]
def __init__(
self,
data=None,
index=None,
columns=None,
dtype=None,
copy=False,
chunk_size=None,
gpu=None,
sparse=None,
num_partitions=None,
dtype_backend=None,
):
dtype_backend = validate_dtype_backend(
dtype_backend or options.dataframe.dtype_backend
)
need_repart = False
if columns is not None and not is_list_like(columns):
raise ValueError("columns must be a list-like object")
if isinstance(data, TENSOR_TYPE):
if chunk_size is not None:
data = data.rechunk(chunk_size)
df = dataframe_from_tensor(
data, index=index, columns=columns, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
elif isinstance(data, SERIES_TYPE):
if columns is not None and len(columns) != 1:
raise ValueError("columns' length must be 1 when data is Series")
col_name = columns[0] if columns else None
df = data.to_frame(name=col_name)
need_repart = num_partitions is not None
elif isinstance(data, DATAFRAME_TYPE):
if not hasattr(data, "data"):
# DataFrameData
df = _Frame(data)
else:
df = data
if columns is not None:
if len(df.columns) != len(columns):
raise ValueError("columns' length must be equal to the data's")
df.columns = columns
need_repart = num_partitions is not None
elif isinstance(data, dict) and self._can_process_by_1d_tileables(data):
# data is a dict and some value is tensor
df = dataframe_from_1d_tileables(
data, index=index, columns=columns, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
elif isinstance(data, list) and any(isinstance(v, ENTITY_TYPE) for v in data):
# stack data together
data = stack(data)
df = dataframe_from_tensor(
data, index=index, columns=columns, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
elif isinstance(index, (INDEX_TYPE, SERIES_TYPE)):
if isinstance(data, dict):
data = {k: astensor(v, chunk_size=chunk_size) for k, v in data.items()}
df = dataframe_from_1d_tileables(
data, index=index, columns=columns, gpu=gpu, sparse=sparse
)
else:
if data is not None:
data = astensor(data, chunk_size=chunk_size)
df = dataframe_from_tensor(
data, index=index, columns=columns, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
else:
if is_cudf(data) or is_cupy(data): # pragma: no cover
pdf = cudf.DataFrame(data, index=index, columns=columns, dtype=dtype)
if copy:
pdf = pdf.copy()
else:
pdf = pd.DataFrame(
data, index=index, columns=columns, dtype=dtype, copy=copy
)
# Apply dtype_backend conversion for pandas DataFrame
if dtype_backend == "pyarrow":
pdf = _convert_to_arrow_dtype(pdf)
if num_partitions is not None:
chunk_size = ceildiv(len(pdf), num_partitions)
df = from_pandas_df(pdf, chunk_size=chunk_size, gpu=gpu, sparse=sparse)
if need_repart:
df = df.rebalance(num_partitions=num_partitions)
super().__init__(df.data)
@classmethod
def _can_process_by_1d_tileables(cls, data: dict):
for value in data.values():
if isinstance(value, ENTITY_TYPE):
return True
elif isinstance(value, (list, tuple)) and any(
isinstance(v, ENTITY_TYPE) for v in value
):
return True
return False
[docs]
class Series(_Series, metaclass=InitializerMeta):
[docs]
def __init__(
self,
data=None,
index=None,
dtype=None,
name=None,
copy=False,
chunk_size=None,
gpu=None,
sparse=None,
num_partitions=None,
dtype_backend=None,
):
dtype_backend = validate_dtype_backend(
dtype_backend or options.dataframe.dtype_backend
)
if dtype is not None:
dtype = pandas_dtype(dtype)
need_repart = False
if isinstance(data, (TENSOR_TYPE, INDEX_TYPE)):
if chunk_size is not None:
data = data.rechunk(chunk_size)
name = name or getattr(data, "name", None)
series = series_from_tensor(
data, index=index, name=name, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
elif isinstance(index, INDEX_TYPE):
if data is not None:
data = astensor(data, chunk_size=chunk_size)
series = series_from_tensor(
data, index=index, name=name, dtype=dtype, gpu=gpu, sparse=sparse
)
need_repart = num_partitions is not None
elif isinstance(data, SERIES_TYPE):
if not hasattr(data, "data"):
# SeriesData
series = _Series(data)
else:
series = data
need_repart = num_partitions is not None
else:
if is_cudf(data) or is_cupy(data): # pragma: no cover
pd_series = cudf.Series(data, index=index, dtype=dtype, name=name)
if copy:
pd_series = pd_series.copy()
else:
pd_series = pd.Series(
data, index=index, dtype=dtype, name=name, copy=copy
)
# Apply dtype_backend conversion for pandas Series
if dtype_backend == "pyarrow":
pd_series = _convert_to_arrow_dtype(pd_series)
if num_partitions is not None:
chunk_size = ceildiv(len(pd_series), num_partitions)
series = from_pandas_series(
pd_series, chunk_size=chunk_size, gpu=gpu, sparse=sparse
)
if need_repart:
series = series.rebalance(num_partitions=num_partitions)
super().__init__(series.data)
[docs]
class Index(_Index, metaclass=InitializerMeta):
def __new__(cls, data, **_):
# just return cls always until we support other Index's initializers
return object.__new__(cls)
[docs]
def __init__(
self,
data=None,
dtype=None,
copy=False,
name=None,
tupleize_cols=True,
chunk_size=None,
gpu=None,
sparse=None,
names=None,
num_partitions=None,
store_data=False,
dtype_backend=None,
):
dtype_backend = validate_dtype_backend(
dtype_backend or options.dataframe.dtype_backend
)
need_repart = False
if isinstance(data, INDEX_TYPE):
if not hasattr(data, "data"):
# IndexData
index = _Index(data)
else:
index = data
need_repart = num_partitions is not None
else:
if isinstance(data, ENTITY_TYPE):
name = name if name is not None else getattr(data, "name", None)
index = from_tileable_index(data, dtype=dtype, name=name, names=names)
need_repart = num_partitions is not None
else:
if not is_index(data):
name = name if name is not None else getattr(data, "name", None)
xdf = cudf if is_cudf(data) or is_cupy(data) else pd
try:
pd_index = xdf.Index(
data=data,
dtype=dtype,
copy=copy,
name=name,
tupleize_cols=tupleize_cols,
)
except TypeError: # pragma: no cover
pd_index = xdf.Index(
data=data, dtype=dtype, copy=copy, name=name
)
# Apply dtype_backend conversion for pandas Index
if xdf is pd and dtype_backend == "pyarrow":
pd_index = _convert_to_arrow_dtype(pd_index)
else:
pd_index = data
# Apply dtype_backend conversion for existing pandas Index/MultiIndex
if isinstance(pd_index, pd.Index) and dtype_backend == "pyarrow":
pd_index = _convert_to_arrow_dtype(pd_index)
if num_partitions is not None:
chunk_size = ceildiv(len(pd_index), num_partitions)
index = from_pandas_index(
pd_index,
chunk_size=chunk_size,
gpu=gpu,
sparse=sparse,
store_data=store_data,
)
if need_repart:
index = index.rebalance(num_partitions=num_partitions)
super().__init__(index.data)
_pd_type_mapping = {
pd.DataFrame: DataFrame,
pd.Series: Series,
pd.Index: Index,
}
[docs]
def read_pandas(
data: Union[pd.DataFrame, pd.Series, pd.Index], **kwargs
) -> Union[DataFrame, Series, Index]:
"""
Create MaxFrame objects from pandas.
Parameters
----------
data: Union[pd.DataFrame, pd.Series, pd.Index]
pandas data
kwargs: dict
arguments to be passed to initializers.
Returns
-------
result: Union[DataFrame, Series, Index]
result MaxFrame object
"""
for pd_cls, cls in _pd_type_mapping.items():
if isinstance(data, pd_cls):
return cls(data, **kwargs)
raise ValueError(f"Type {type(data)} not supported")