# Copyright 1999-2026 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import Any, Dict
import numpy as np
import pandas as pd
from maxframe import dataframe as md
from maxframe.core.entity.output_types import OutputType
from maxframe.core.operator.base import Operator
from maxframe.core.operator.core import TileableOperatorMixin
from maxframe.dataframe.core import DATAFRAME_TYPE, SERIES_TYPE
from maxframe.dataframe.operators import DataFrameOperatorMixin
from maxframe.dataframe.utils import parse_index
from maxframe.serialization.serializables import Int32Field
from maxframe.serialization.serializables.core import Serializable
from maxframe.serialization.serializables.field import (
AnyField,
BoolField,
DictField,
StringField,
)
# Task type constants
TASK_TEXT_GENERATION = "text-generation"
TASK_SENTENCE_EMBEDDING = "sentence-embedding"
TASK_IMAGE_TEXT_TO_TEXT = "image-text-to-text"
TASK_MULTI_MODAL_EMBEDDING = "multi-modal-embedding"
TEXT_CONTENT_PART = "text"
IMAGE_CONTENT_PART = "image"
IMAGE_URL_CONTENT = "image_url"
IMAGE_BINARY_CONTENT = "binary"
IMAGE_BASE64_CONTENT = "base64"
IMAGE_MIME_TYPE = "mime_type"
[docs]
class ImageContentType(str, Enum):
IMAGE_URL = IMAGE_URL_CONTENT
BINARY = IMAGE_BINARY_CONTENT
BASE64 = IMAGE_BASE64_CONTENT
IMAGE_CONTENT_KEYS = tuple(item.value for item in ImageContentType)
[docs]
class ContentPart(Serializable):
type: str = StringField("type")
content: Any = AnyField("content")
options: Dict[str, Any] = DictField("options", default_factory=dict)
@classmethod
def text(cls, text: Any) -> "ContentPart":
return cls(TEXT_CONTENT_PART, _series_as_column_template(text))
@classmethod
def image(
cls,
*,
data: Any,
type: ImageContentType,
mime_type: Any = None,
**options: Any,
) -> "ContentPart":
image_type = ImageContentType(type)
if image_type in (ImageContentType.BINARY, ImageContentType.BASE64):
if mime_type is None:
raise ValueError(
"mime_type is required for binary and base64 image content"
)
content = {
"type": image_type.value,
"data": _series_as_column_template(data),
}
if mime_type is not None:
content[IMAGE_MIME_TYPE] = _series_as_column_template(mime_type)
options = {k: _series_as_column_template(v) for k, v in options.items()}
return cls(IMAGE_CONTENT_PART, content, options)
def _series_as_column_template(value: Any):
if isinstance(value, SERIES_TYPE):
return f"{{{value.name}}}"
return value
class LLM(Serializable):
name = StringField("name", default=None)
@property
def content_part(self):
return ContentPart
def validate_params(self, params: Dict[str, Any]):
pass
def validate_llm_input_data(data):
if not isinstance(data, DATAFRAME_TYPE) and not isinstance(data, SERIES_TYPE):
raise ValueError("data must be a maxframe dataframe or series object")
class LLMTaskOperator(Operator, DataFrameOperatorMixin):
task = AnyField("task", default=None)
model = AnyField("model", default=None)
params = DictField("params", default=None)
running_options: Dict[str, Any] = DictField("running_options", default=None)
timeout = Int32Field("timeout", default=None)
def __init__(self, output_types=None, **kw):
if output_types is None:
output_types = [OutputType.dataframe]
running_options = kw.pop("running_options", {})
self._setup_default_quotas(running_options)
super().__init__(
_output_types=output_types, running_options=running_options, **kw
)
@staticmethod
def _setup_default_quotas(running_options):
"""Setup default quota configurations."""
from maxframe import options
quota_names = ["gu_quota_name", "inference_quota_name"]
for quota_name in quota_names:
running_options[quota_name] = running_options.get(
quota_name, getattr(options.session, quota_name)
)
if running_options[quota_name] is not None and not isinstance(
running_options[quota_name], str
):
raise TypeError(f"{quota_name} must be a string")
return running_options
def get_output_dtypes(self) -> Dict[str, np.dtype]:
raise NotImplementedError
def __call__(self, data, index=None):
outputs = self.get_output_dtypes()
col_name = list(outputs.keys())
columns = parse_index(pd.Index(col_name), store_data=True)
out_dtypes = pd.Series(list(outputs.values()), index=col_name)
index_value = index or data.index_value
return self.new_dataframe(
inputs=[data],
shape=(np.nan, len(col_name)),
index_value=index_value,
columns_value=columns,
dtypes=out_dtypes,
)
def can_fuse_with_custom_code(self) -> bool:
return False
class LLMTextGenOperator(LLMTaskOperator, TileableOperatorMixin):
prompt_template = AnyField("prompt_template", default=None)
def get_output_dtypes(self) -> Dict[str, np.dtype]:
return {"response": md.dtype("string"), "success": np.dtype("bool")}
class LLMTextEmbeddingOp(LLMTaskOperator, TileableOperatorMixin):
input = StringField("input", default=None)
dimensions = Int32Field("dimensions", default=None)
encoding_format = StringField("encoding_format", default=None)
simple_output = BoolField("simple_output", default=False)
def get_output_dtypes(self) -> Dict[str, np.dtype]:
return {"response": md.dtype("string"), "success": np.dtype("bool")}