多模态图像特征生产方案#

Available at MaxFrame 2.6.0

背景信息#

企业数据平台越来越多地使用图片、文本、音频和视频等多模态数据。这些数据通常格式异构、结构化程度低,难以直接支撑检索、推荐、审核、聚类和分析等下游场景。

一个可复用的生产模式是将链路标准化:下载并解码原始文件、统一格式、执行语义理解、生成标签、提取 Embedding,并将向量写入可复用的特征表。

本方案依托 MaxFrame 的分布式处理能力,演示一条面向多场景复用的图像特征生产链路:批量预处理图像、调用多模态模型生成语义 Embedding,并将结果写入表中,供检索、推荐、审核、聚类和分析等下游任务使用。

适用场景#

  • 多模态检索与相似内容召回。

  • 为推荐与排序系统生产图像特征。

  • 内容理解、审核与质量分析。

  • 基于向量相似度进行去重与聚类。

  • 构建可复用的企业级图像特征资产。

  • 为 RAG、知识库与 Agent 应用准备多模态数据。

核心处理流程#

图像处理与 Embedding 流程

前提条件#

#

条件

说明

1

开通 MaxCompute

需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。

2

开通 DPE 引擎

Image 算子与 apply_chunk 在 DPE 上执行。

3

上传图片至 OSS

请自行创建 OSS Bucket,并上传本教程使用的五张示例图片。

4

OSS RAM 角色授权

配置用于 OSS 读取访问的 Role ARN。

5

百炼 API Key

创建用于多模态 Embedding 调用的 API Key。

6

MaxFrame SDK 版本

请使用 MaxFrame SDK 2.6.0 及以上版本(pip install maxframe>=2.6.0)。

配置 OSS RAM 角色#

请参考 OSS 挂载及使用实践 中“开通服务及授权”部分,创建 OSS Bucket 以及具备 OSS 读取权限的 RAM 角色。本示例代码只需要将 RAM Role ARN 通过 with_fs_mountstorage_options={"role_arn": ROLE_ARN} 传入。

控制台指南中的“创建服务关联角色”步骤不是本示例代码会用到的内容。如果你的 MaxCompute 服务已经开通,请重点完成 role_arn 所需的 RAM 角色和 OSS 权限配置。

获取百炼 API Key#

前往 百炼控制台 - API Key 管理 创建或管理 API Key。

百炼控制台 API Key 管理页面

环境准备#

import maxframe

assert maxframe.__version__ >= "2.6.0", (
    f"maxframe >= 2.6.0 is required, current version: {maxframe.__version__}. "
    f"Please run: pip install --upgrade maxframe"
)

import pandas as pd
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.udf import with_python_requirements, with_fs_mount
from maxframe.config import options
from odps import ODPS

pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_columns", None)

options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}
options.dpe.settings = {
    "substep.internal_network_whitelist": [
        "intranet-cn-beijing.dashscope.aliyuncs.com:443",
    ],
}

DASHSCOPE_API_KEY = "<your_dashscope_api_key>"
ROLE_ARN = "acs:ram::<your_account_id>:role/<your_role_name>"
OSS_ENDPOINT = "<your_oss_endpoint>"  # for example: oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET = "<your_oss_bucket>"
OSS_IMAGE_PREFIX = "maxframe-image-demo/"

o = ODPS(
    access_id="<your_access_id>",
    secret_access_key="<your_access_key_secret>",
    project="<your_mc_project>",
    endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView    : {session.get_logview_address()}")

1. 构建输入数据集#

运行本节前,请将这五张图片上传到 oss://<your_oss_endpoint>/<your_oss_bucket>/maxframe-image-demo/IMAGE_FILES 中的文件名必须和 OSS Bucket 中的对象名完全一致。

IMAGE_FILES = ["img_001.jpg", "img_002.jpg", "img_003.jpg", "img_004.png", "img_005.jpg"]

df = md.DataFrame({
    "id": list(range(1, len(IMAGE_FILES) + 1)),
    "filename": IMAGE_FILES,
})

# Production path:
# df = md.read_odps_table("your_image_table", columns=["id", "filename"])

2. 图片下载、解码与重采样#

@with_fs_mount(
    path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_IMAGE_PREFIX}",
    mount_path="/mnt/oss_data",
    storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("Pillow==10.4.0")
def download_and_process(filename_series, target_size=(512, 512)):
    import io
    import base64
    import pandas as pd
    from PIL import Image

    results = []
    widths, heights, formats = [], [], []
    for fname in filename_series:
        try:
            img_path = f"/mnt/oss_data/{fname}"
            img = Image.open(img_path)
            widths.append(img.width)
            heights.append(img.height)
            formats.append(img.format)

            img = img.convert("RGB").resize(target_size, Image.LANCZOS)
            buf = io.BytesIO()
            img.save(buf, format="JPEG", quality=85)
            results.append(base64.b64encode(buf.getvalue()).decode("utf-8"))
        except Exception:
            results.append(None)
            widths.append(None)
            heights.append(None)
            formats.append(None)

    return pd.DataFrame(
        {
            "width": widths,
            "height": heights,
            "format": formats,
            "img_base64": results,
        },
        index=filename_series.index,
    )

result_df = df["filename"].mf.apply_chunk(
    download_and_process,
    output_type="dataframe",
    dtypes={
        "width": "float64",
        "height": "float64",
        "format": "object",
        "img_base64": "object",
    },
    skip_infer=True,
    target_size=(512, 512),
)

df["width"] = result_df["width"]
df["height"] = result_df["height"]
df["format"] = result_df["format"]
df["img_base64"] = result_df["img_base64"]
df = df.execute()

3. 多模态 Embedding 推理#

备注

在大规模生产任务中,建议申请更高 API 并发配额以避免 429/403 限流。如果任务超时,请优先打开 LogView,检查 OSS 挂载、图片文件名、RAM 角色权限以及百炼网络访问是否正确。

@with_python_requirements("dashscope>=1.24.6")
def embed_images(base64_series, api_key=None, max_retries=3):
    import time
    import pandas as pd
    import dashscope
    from dashscope import MultiModalEmbedding

    dashscope.api_key = api_key
    dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"

    results = []
    for b64_str in base64_series:
        if not b64_str:
            results.append(None)
            continue
        for attempt in range(max_retries):
            resp = MultiModalEmbedding.call(
                model="tongyi-embedding-vision-flash-2026-03-06",
                input=[{"image": f"data:image/jpeg;base64,{b64_str}"}],
            )
            if resp.status_code == 200:
                results.append(resp.output["embeddings"][0]["embedding"])
                break
            if attempt < max_retries - 1:
                time.sleep(2**attempt)
            else:
                results.append(None)

    return pd.Series(results, index=base64_series.index)

df["embedding"] = df["img_base64"].mf.apply_chunk(
    embed_images,
    output_type="series",
    dtype="object",
    skip_infer=True,
    batch_rows=5,
    index=df["img_base64"].index,
    api_key=DASHSCOPE_API_KEY,
)

df = df.execute()
print(df.fetch())

4. 将 Embedding 结果写入 MaxCompute#

md.to_odps_table(
    md.DataFrame(df),
    "image_embedding_result",
    overwrite=True,
).execute()

print("Results written to table: image_embedding_result")

5. 资源清理#

print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")

常见问题排查#

问题

原因

解决方案

Engine DPE not available

未开通 DPE

联系管理员在项目中开通 DPE。

OSS access denied

角色权限错误或缺失

检查 role_arn 与 OSS 读取权限配置。

dashscope API 401

API Key 无效

在百炼控制台检查 API Key 状态。

Embedding 结果为 None

单张图片推理失败

检查图片完整性与 base64 编码结果。

向量维度不一致

混用了不同的 Embedding 模型

同一批次或输出表请使用同一 Embedding 模型。