Source code for maxframe.learn.contrib.llm.core

# Copyright 1999-2026 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
from typing import Any, Dict

import numpy as np
import pandas as pd

from maxframe import dataframe as md
from maxframe.core.entity.output_types import OutputType
from maxframe.core.operator.base import Operator
from maxframe.core.operator.core import TileableOperatorMixin
from maxframe.dataframe.core import DATAFRAME_TYPE, SERIES_TYPE
from maxframe.dataframe.operators import DataFrameOperatorMixin
from maxframe.dataframe.utils import parse_index
from maxframe.serialization.serializables import Int32Field
from maxframe.serialization.serializables.core import Serializable
from maxframe.serialization.serializables.field import (
    AnyField,
    BoolField,
    DictField,
    StringField,
)

# Task type constants
TASK_TEXT_GENERATION = "text-generation"
TASK_SENTENCE_EMBEDDING = "sentence-embedding"
TASK_IMAGE_TEXT_TO_TEXT = "image-text-to-text"
TASK_MULTI_MODAL_EMBEDDING = "multi-modal-embedding"

TEXT_CONTENT_PART = "text"
IMAGE_CONTENT_PART = "image"
IMAGE_URL_CONTENT = "image_url"
IMAGE_BINARY_CONTENT = "binary"
IMAGE_BASE64_CONTENT = "base64"
IMAGE_MIME_TYPE = "mime_type"


[docs] class ImageContentType(str, Enum): IMAGE_URL = IMAGE_URL_CONTENT BINARY = IMAGE_BINARY_CONTENT BASE64 = IMAGE_BASE64_CONTENT
IMAGE_CONTENT_KEYS = tuple(item.value for item in ImageContentType)
[docs] class ContentPart(Serializable): type: str = StringField("type") content: Any = AnyField("content") options: Dict[str, Any] = DictField("options", default_factory=dict) @classmethod def text(cls, text: Any) -> "ContentPart": return cls(TEXT_CONTENT_PART, _series_as_column_template(text)) @classmethod def image( cls, *, data: Any, type: ImageContentType, mime_type: Any = None, **options: Any, ) -> "ContentPart": image_type = ImageContentType(type) if image_type in (ImageContentType.BINARY, ImageContentType.BASE64): if mime_type is None: raise ValueError( "mime_type is required for binary and base64 image content" ) content = { "type": image_type.value, "data": _series_as_column_template(data), } if mime_type is not None: content[IMAGE_MIME_TYPE] = _series_as_column_template(mime_type) options = {k: _series_as_column_template(v) for k, v in options.items()} return cls(IMAGE_CONTENT_PART, content, options)
def _series_as_column_template(value: Any): if isinstance(value, SERIES_TYPE): return f"{{{value.name}}}" return value class LLM(Serializable): name = StringField("name", default=None) @property def content_part(self): return ContentPart def validate_params(self, params: Dict[str, Any]): pass def validate_llm_input_data(data): if not isinstance(data, DATAFRAME_TYPE) and not isinstance(data, SERIES_TYPE): raise ValueError("data must be a maxframe dataframe or series object") class LLMTaskOperator(Operator, DataFrameOperatorMixin): task = AnyField("task", default=None) model = AnyField("model", default=None) params = DictField("params", default=None) running_options: Dict[str, Any] = DictField("running_options", default=None) timeout = Int32Field("timeout", default=None) def __init__(self, output_types=None, **kw): if output_types is None: output_types = [OutputType.dataframe] running_options = kw.pop("running_options", {}) self._setup_default_quotas(running_options) super().__init__( _output_types=output_types, running_options=running_options, **kw ) @staticmethod def _setup_default_quotas(running_options): """Setup default quota configurations.""" from maxframe import options quota_names = ["gu_quota_name", "inference_quota_name"] for quota_name in quota_names: running_options[quota_name] = running_options.get( quota_name, getattr(options.session, quota_name) ) if running_options[quota_name] is not None and not isinstance( running_options[quota_name], str ): raise TypeError(f"{quota_name} must be a string") return running_options def get_output_dtypes(self) -> Dict[str, np.dtype]: raise NotImplementedError def __call__(self, data, index=None): outputs = self.get_output_dtypes() col_name = list(outputs.keys()) columns = parse_index(pd.Index(col_name), store_data=True) out_dtypes = pd.Series(list(outputs.values()), index=col_name) index_value = index or data.index_value return self.new_dataframe( inputs=[data], shape=(np.nan, len(col_name)), index_value=index_value, columns_value=columns, dtypes=out_dtypes, ) def can_fuse_with_custom_code(self) -> bool: return False class LLMTextGenOperator(LLMTaskOperator, TileableOperatorMixin): prompt_template = AnyField("prompt_template", default=None) def get_output_dtypes(self) -> Dict[str, np.dtype]: return {"response": md.dtype("string"), "success": np.dtype("bool")} class LLMTextEmbeddingOp(LLMTaskOperator, TileableOperatorMixin): input = StringField("input", default=None) dimensions = Int32Field("dimensions", default=None) encoding_format = StringField("encoding_format", default=None) simple_output = BoolField("simple_output", default=False) def get_output_dtypes(self) -> Dict[str, np.dtype]: return {"response": md.dtype("string"), "success": np.dtype("bool")}