Source code for maxframe.dataframe.typing_

# Copyright 1999-2026 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
import functools
import inspect
from typing import Any, Callable, Generic, List, Optional, TypeVar, Union

import numpy as np
import pandas as pd
import pyarrow as pa

from maxframe.core import OutputType
from maxframe.dataframe.type_infer import InferredDataFrameMeta
from maxframe.dataframe.utils import parse_index
from maxframe.lib.dtypes_extension import ArrowDtype, ExternalBlobDtype
from maxframe.lib.dtypes_extension.blob import AbstractExternalBlob
from maxframe.typing_ import PandasDType
from maxframe.utils import arrow_type_from_str, make_dtype, wrap_arrow_dtype

try:
    from odps import types as odps_types
except ImportError:
    odps_types = None

# TypeVars
T = TypeVar("T")


@dataclasses.dataclass
class _FieldDef:
    name: Any
    dtype: PandasDType


def _item_to_field_def(item_):
    if isinstance(item_, tuple):
        tp = make_dtype(item_[1])
        return _FieldDef(name=item_[0], dtype=tp)
    elif isinstance(item_, slice):
        assert item_.step is None, "Should not specify step when specifying type hints"
        return _FieldDef(name=item_.start, dtype=item_.stop)
    else:
        tp = make_dtype(item_)
        return _FieldDef(name=None, dtype=tp)


class IndexType:
    def __init__(self, index_fields: List[_FieldDef]):
        self.index_fields = index_fields

    def __repr__(self):
        return f"IndexType({[f.dtype for f in self.index_fields]})"

    def __reduce__(self):
        # When picked to executors, detailed field types are not needed,
        #  and we should not introduce MaxFrame itself as a dependency.
        return getattr, (pd, "Index")

    @classmethod
    def from_getitem_args(cls, item) -> "IndexType":
        if isinstance(item, (dict, pd.Series)):
            item = list(item.items())

        if isinstance(item, list) or (
            item and isinstance(item, tuple) and isinstance(item[0], slice)
        ):
            return IndexType([_item_to_field_def(tp) for tp in item])
        else:
            return IndexType([_item_to_field_def(item)])


class SeriesType(Generic[T]):
    def __init__(
        self,
        index_fields: Optional[List[_FieldDef]],
        name_and_dtype: _FieldDef,
        element_fields: Optional[List[_FieldDef]] = None,
    ):
        self.index_fields = index_fields
        self.name_and_dtype = name_and_dtype
        self.element_fields = element_fields  # NEW: for dict-based element types

    def __repr__(self) -> str:
        if self.element_fields:
            return "SeriesType[{{{}}}]".format(
                ", ".join(f"{fd.name}: {fd.dtype}" for fd in self.element_fields)
            )
        return "SeriesType[{}]".format(self.name_and_dtype.dtype)

    def __reduce__(self):
        # When picked to executors, detailed field types are not needed,
        #  and we should not introduce MaxFrame itself as a dependency.
        return getattr, (pd, "Series")

    @classmethod
    def from_getitem_args(cls, item) -> "SeriesType":
        if not isinstance(item, tuple):
            item = (item,)

        # NEW: Check if item[0] is a dict (element type specification)
        if len(item) == 1 and isinstance(item[0], dict):
            # Parse dict into element_fields
            if len(item[0]) == 0:
                raise ValueError("Element type annotation dict cannot be empty")
            element_fields = [
                _FieldDef(name=k, dtype=make_dtype(v)) for k, v in item[0].items()
            ]
            return SeriesType(
                index_fields=None,
                name_and_dtype=_FieldDef(name=None, dtype=None),
                element_fields=element_fields,
            )

        # Existing logic for other cases
        if len(item) == 1:
            tp = _item_to_field_def(item[0])
            return SeriesType(None, tp)
        else:
            tp = _item_to_field_def(item[1])
            idx_fields = IndexType.from_getitem_args(item[0]).index_fields
            return SeriesType(idx_fields, tp)


class DataFrameType:
    def __init__(
        self,
        index_fields: Optional[List[_FieldDef]],
        data_fields: List[_FieldDef],
    ):
        self.index_fields = index_fields
        self.data_fields = data_fields

    def __repr__(self) -> str:
        types = [field.dtype for field in self.data_fields]
        return f"DataFrameType[{types}]"

    def __reduce__(self):
        # When picked to executors, detailed field types are not needed,
        #  and we should not introduce MaxFrame itself as a dependency.
        return getattr, (pd, "DataFrame")

    @classmethod
    def from_getitem_args(cls, item) -> "DataFrameType":
        if not isinstance(item, tuple):
            item = (item,)
        if isinstance(item[0], slice):
            value_defs = item
            idx_defs = None
        else:
            value_defs = item[-1]
            idx_defs = item[0] if len(item) > 1 else None
        fields = IndexType.from_getitem_args(value_defs).index_fields
        if idx_defs is None:
            return DataFrameType(None, fields)
        else:
            idx_fields = IndexType.from_getitem_args(item[0]).index_fields
            return DataFrameType(idx_fields, fields)


def get_function_output_meta(
    func: Callable, df_obj=None, axis: Optional[int] = None
) -> Optional[InferredDataFrameMeta]:
    try:
        func_argspec = inspect.getfullargspec(func)
        ret_type = (func_argspec.annotations or {}).get("return")
        if ret_type is None:
            return None
    except:
        return None

    dtypes = dtype = name = None
    index_fields = None

    # Handle SeriesType with element_fields when axis=1
    if isinstance(ret_type, SeriesType) and ret_type.element_fields and axis == 1:
        # Convert to DataFrame output
        output_type = OutputType.dataframe
        dtypes = pd.Series(
            [fd.dtype for fd in ret_type.element_fields],
            index=[fd.name for fd in ret_type.element_fields],
        )
        index_fields = None  # Will be inherited from input DataFrame
    elif isinstance(ret_type, DataFrameType):
        output_type = OutputType.dataframe
        dtypes = pd.Series(
            [fd.dtype for fd in ret_type.data_fields],
            index=[fd.name for fd in ret_type.data_fields],
        )
        index_fields = ret_type.index_fields
    elif isinstance(ret_type, SeriesType):
        output_type = OutputType.series
        dtype = ret_type.name_and_dtype.dtype
        name = ret_type.name_and_dtype.name
        index_fields = ret_type.index_fields
    elif isinstance(ret_type, IndexType):
        output_type = OutputType.index
        index_fields = ret_type.index_fields
    else:
        output_type = OutputType.scalar
        try:
            dtype = make_dtype(ret_type)
        except:
            return None

    if index_fields is not None:
        if len(index_fields) == 1:
            mock_idx = pd.Index(
                [], dtype=index_fields[0].dtype, name=index_fields[0].name
            )
        else:
            col_names = [index_field.name for index_field in index_fields]
            col_dtypes = pd.Series(
                [index_field.dtype for index_field in index_fields], index=col_names
            )
            mock_df = pd.DataFrame([], columns=col_names).astype(col_dtypes)
            mock_idx = pd.MultiIndex.from_frame(mock_df)
        index_value = parse_index(mock_idx, df_obj, store_data=False)
    else:
        index_value = None

    return InferredDataFrameMeta(
        output_type=output_type,
        index_value=index_value,
        dtypes=dtypes,
        dtype=dtype,
        name=name,
    )


def _dtype(type) -> Union[pd.api.extensions.ExtensionDtype]:
    from maxframe.io.odpsio.schema import odps_type_to_arrow_type

    # Handle blob type special case for ODPS DataType
    if odps_types is not None and hasattr(odps_types, "blob"):
        if type is odps_types.blob or (
            isinstance(type, odps_types.DataType) and type == odps_types.blob
        ):
            return ExternalBlobDtype()

    # Handle Python built-in type classes
    # Check if type is one of the supported Python type classes directly
    if type is str:
        return pd.StringDtype("pyarrow")
    elif type is bytes:
        return wrap_arrow_dtype(pa.binary())
    elif type in (int, float, bool):
        type = np.dtype(type)
        # Fall through to np.dtype handling below

    # Convert to PyArrow type based on input type
    if isinstance(type, str):
        # String type representation
        # First, try to parse as ODPS type if odps is available
        if odps_types is not None:
            try:
                odps_type = odps_types.validate_data_type(type)
                # Check if it's blob type
                if hasattr(odps_types, "blob") and odps_type == odps_types.blob:
                    return ExternalBlobDtype()
                # Convert ODPS type to Arrow type
                arrow_type = odps_type_to_arrow_type(odps_type, "column")
            except (ValueError, AttributeError):
                # Not an ODPS type, try Arrow type string
                arrow_type = arrow_type_from_str(type)
        else:
            # odps not installed, use Arrow type string parsing
            arrow_type = arrow_type_from_str(type)

    elif isinstance(type, np.dtype):
        # NumPy dtype
        if pa is None:
            raise TypeError(
                "PyArrow is required to convert NumPy dtypes. "
                "Please install pyarrow package."
            )
        arrow_type = pa.from_numpy_dtype(type)

    elif isinstance(type, pd.api.extensions.ExtensionDtype):
        # Pandas ExtensionDtype
        if isinstance(type, (ArrowDtype, ExternalBlobDtype)):
            # Already an ArrowDtype or an ExternalBlobDtype, return as-is
            return type
        elif isinstance(type, pd.StringDtype):
            # Convert StringDtype to pyarrow-backed string dtype
            try:
                return pd.StringDtype("pyarrow")
            except (TypeError, ImportError):
                # Fall back to ArrowDtype if StringDtype("pyarrow") not available
                arrow_type = pa.string()
        elif hasattr(type, "pyarrow_dtype"):
            # Extension dtype with PyArrow backing
            arrow_type = type.pyarrow_dtype
        else:
            # Try to convert via pandas
            try:
                arrow_type = pa.from_numpy_dtype(type.numpy_dtype)
            except (AttributeError, TypeError):
                raise TypeError(
                    f"Cannot convert pandas ExtensionDtype '{type}' to MaxFrame dtype. "
                    f"Supported types include ArrowDtype, StringDtype, and numeric dtypes."
                )

    elif odps_types is not None and isinstance(type, odps_types.DataType):
        # ODPS type
        arrow_type = odps_type_to_arrow_type(type, "column")

    elif pa is not None and isinstance(type, pa.DataType):
        # Already a PyArrow type
        arrow_type = type

    else:
        # Unsupported type
        raise TypeError(
            f"Unsupported type '{type}' of type '{type.__class__.__name__}'. "
            f"Supported types are: str, np.dtype, pd.ExtensionDtype, "
            f"odps.types.DataType (if odps installed), or pa.DataType (if pyarrow installed)."
        )

    # Wrap Arrow type in ArrowDtype
    return wrap_arrow_dtype(arrow_type)


_cached_dtype = functools.lru_cache(1000)(_dtype)


[docs] def dtype(type): """ Recommend dtype for MaxFrame DataFrame operations. Converts various type representations into MaxFrame-compatible dtypes. Returns pd.ArrowDtype for most types and ExternalBlobDtype for blob types. Parameters ---------- type : str, np.dtype, pd.ExtensionDtype, odps.types.DataType, pa.DataType, or Python type class The type to convert to MaxFrame-compatible dtype. Supported input types: - str: Type string like 'int64', 'string', 'blob', 'list<item: int64>', etc. - np.dtype: NumPy dtype objects - pd.ExtensionDtype: Pandas extension dtypes - odps.types.DataType: ODPS data types (requires odps package) - pa.DataType: PyArrow data types - Python type classes: int, float, bool, str, bytes (int -> int64, float -> float64, bool -> bool, str -> StringDtype(pyarrow), bytes -> binary) Returns ------- ArrowDtype or ExternalBlobDtype Recommended dtype for use in MaxFrame operations. Returns ExternalBlobDtype for blob types, ArrowDtype for all others. Raises ------ TypeError If the input type is not supported or cannot be converted. Examples -------- >>> import maxframe.dataframe as md >>> md.dtype("int64") ArrowDtype(int64[pyarrow]) >>> md.dtype("blob") ExternalBlobDtype() >>> import numpy as np >>> md.dtype(np.dtype('float32')) ArrowDtype(float[pyarrow]) >>> from odps import types as odps_types >>> md.dtype(odps_types.string) ArrowDtype(string[pyarrow]) >>> md.dtype(int) ArrowDtype(int64[pyarrow]) >>> md.dtype(float) ArrowDtype(double[pyarrow]) >>> md.dtype(str) StringDtype(pyarrow) >>> md.dtype(bytes) ArrowDtype(binary[pyarrow]) """ try: return _cached_dtype(type) except TypeError as ex: if "unhashable" in str(ex): # raise actual error return _dtype(type) raise
[docs] def infer_dtype(obj: Any) -> Union[ArrowDtype, ExternalBlobDtype]: """ Infer MaxFrame-compatible dtype from a Python object. Creates a single-element PyArrow array from the object to infer its type, then converts to MaxFrame-compatible dtype using the dtype() function. Parameters ---------- obj : Any Python object to infer dtype from. Can be: - Scalar values (int, float, bool, str, bytes) - Lists/tuples (for list/array types) - Dicts (for struct types) - Blob objects (SolidBlob) - Any other Python object supported by PyArrow Returns ------- ArrowDtype or ExternalBlobDtype Inferred dtype for the object. Raises ------ TypeError If the object type is not supported or dtype cannot be inferred. Examples -------- >>> import maxframe.dataframe as md >>> md.infer_dtype(42) ArrowDtype(int64[pyarrow]) >>> md.infer_dtype(3.14) ArrowDtype(double[pyarrow]) >>> md.infer_dtype("hello") StringDtype(pyarrow) >>> md.infer_dtype([1, 2, 3]) ArrowDtype(list<item: int64>[pyarrow]) >>> md.infer_dtype({"a": 1, "b": 2}) ArrowDtype(struct<a: int64, b: int64>[pyarrow]) """ if obj is None: raise TypeError("Cannot infer dtype from None") # Handle blob objects specially - they need ExternalBlobDtype if isinstance(obj, AbstractExternalBlob): return ExternalBlobDtype() try: # Create a single-element PyArrow array to infer type arr = pa.array([obj]) arrow_type = arr.type # Use existing dtype() to convert Arrow type to MaxFrame dtype # This ensures consistency and handles special cases like blob types return dtype(arrow_type) except (pa.ArrowInvalid, pa.ArrowTypeError) as exc: raise TypeError( f"Cannot infer dtype from object of type {type(obj).__name__}: {exc}" ) from exc
def register_pandas_typing_funcs(): def _cls_getitem_func(cls, item, type_cls): return type_cls.from_getitem_args(item) for pd_cls, type_cls in [ (pd.DataFrame, DataFrameType), (pd.Series, SeriesType), (pd.Index, IndexType), ]: if hasattr(pd_cls, "__class_getitem__"): # pragma: no cover continue pd_cls.__class_getitem__ = classmethod( functools.partial(_cls_getitem_func, type_cls=type_cls) )