多模态图像特征生产方案#
背景信息#
企业数据平台越来越多地使用图片、文本、音频和视频等多模态数据。这些数据通常格式异构、结构化程度低,难以直接支撑检索、推荐、审核、聚类和分析等下游场景。
一个可复用的生产模式是将链路标准化:下载并解码原始文件、统一格式、执行语义理解、生成标签、提取 Embedding,并将向量写入可复用的特征表。
本方案依托 MaxFrame 的分布式处理能力,演示一条面向多场景复用的图像特征生产链路:批量预处理图像、调用多模态模型生成语义 Embedding,并将结果写入表中,供检索、推荐、审核、聚类和分析等下游任务使用。
适用场景#
多模态检索与相似内容召回。
为推荐与排序系统生产图像特征。
内容理解、审核与质量分析。
基于向量相似度进行去重与聚类。
构建可复用的企业级图像特征资产。
为 RAG、知识库与 Agent 应用准备多模态数据。
核心处理流程#
前提条件#
# |
条件 |
说明 |
|---|---|---|
1 |
开通 MaxCompute |
需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。 |
2 |
开通 DPE 引擎 |
Image 算子与 |
3 |
上传图片至 OSS |
请自行创建 OSS Bucket,并上传本教程使用的五张示例图片。 |
4 |
OSS RAM 角色授权 |
配置用于 OSS 读取访问的 Role ARN。 |
5 |
百炼 API Key |
创建用于多模态 Embedding 调用的 API Key。 |
6 |
MaxFrame SDK 版本 |
请使用 MaxFrame SDK 2.6.0 及以上版本( |
配置 OSS RAM 角色#
请参考 OSS 挂载及使用实践 中“开通服务及授权”部分,创建 OSS Bucket 以及具备 OSS 读取权限的 RAM 角色。本示例代码只需要将 RAM Role ARN 通过 with_fs_mount 的 storage_options={"role_arn": ROLE_ARN} 传入。
控制台指南中的“创建服务关联角色”步骤不是本示例代码会用到的内容。如果你的 MaxCompute 服务已经开通,请重点完成 role_arn 所需的 RAM 角色和 OSS 权限配置。
获取百炼 API Key#
前往 百炼控制台 - API Key 管理 创建或管理 API Key。
环境准备#
import maxframe
assert maxframe.__version__ >= "2.6.0", (
f"maxframe >= 2.6.0 is required, current version: {maxframe.__version__}. "
f"Please run: pip install --upgrade maxframe"
)
import pandas as pd
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.udf import with_python_requirements, with_fs_mount
from maxframe.config import options
from odps import ODPS
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_columns", None)
options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}
options.dpe.settings = {
"substep.internal_network_whitelist": [
"intranet-cn-beijing.dashscope.aliyuncs.com:443",
],
}
DASHSCOPE_API_KEY = "<your_dashscope_api_key>"
ROLE_ARN = "acs:ram::<your_account_id>:role/<your_role_name>"
OSS_ENDPOINT = "<your_oss_endpoint>" # for example: oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET = "<your_oss_bucket>"
OSS_IMAGE_PREFIX = "maxframe-image-demo/"
o = ODPS(
access_id="<your_access_id>",
secret_access_key="<your_access_key_secret>",
project="<your_mc_project>",
endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView : {session.get_logview_address()}")
1. 构建输入数据集#
运行本节前,请将这五张图片上传到 oss://<your_oss_endpoint>/<your_oss_bucket>/maxframe-image-demo/。IMAGE_FILES 中的文件名必须和 OSS Bucket 中的对象名完全一致。
IMAGE_FILES = ["img_001.jpg", "img_002.jpg", "img_003.jpg", "img_004.png", "img_005.jpg"]
df = md.DataFrame({
"id": list(range(1, len(IMAGE_FILES) + 1)),
"filename": IMAGE_FILES,
})
# Production path:
# df = md.read_odps_table("your_image_table", columns=["id", "filename"])
2. 图片下载、解码与重采样#
@with_fs_mount(
path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_IMAGE_PREFIX}",
mount_path="/mnt/oss_data",
storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("Pillow==10.4.0")
def download_and_process(filename_series, target_size=(512, 512)):
import io
import base64
import pandas as pd
from PIL import Image
results = []
widths, heights, formats = [], [], []
for fname in filename_series:
try:
img_path = f"/mnt/oss_data/{fname}"
img = Image.open(img_path)
widths.append(img.width)
heights.append(img.height)
formats.append(img.format)
img = img.convert("RGB").resize(target_size, Image.LANCZOS)
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=85)
results.append(base64.b64encode(buf.getvalue()).decode("utf-8"))
except Exception:
results.append(None)
widths.append(None)
heights.append(None)
formats.append(None)
return pd.DataFrame(
{
"width": widths,
"height": heights,
"format": formats,
"img_base64": results,
},
index=filename_series.index,
)
result_df = df["filename"].mf.apply_chunk(
download_and_process,
output_type="dataframe",
dtypes={
"width": "float64",
"height": "float64",
"format": "object",
"img_base64": "object",
},
skip_infer=True,
target_size=(512, 512),
)
df["width"] = result_df["width"]
df["height"] = result_df["height"]
df["format"] = result_df["format"]
df["img_base64"] = result_df["img_base64"]
df = df.execute()
3. 多模态 Embedding 推理#
备注
在大规模生产任务中,建议申请更高 API 并发配额以避免 429/403 限流。如果任务超时,请优先打开 LogView,检查 OSS 挂载、图片文件名、RAM 角色权限以及百炼网络访问是否正确。
@with_python_requirements("dashscope>=1.24.6")
def embed_images(base64_series, api_key=None, max_retries=3):
import time
import pandas as pd
import dashscope
from dashscope import MultiModalEmbedding
dashscope.api_key = api_key
dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"
results = []
for b64_str in base64_series:
if not b64_str:
results.append(None)
continue
for attempt in range(max_retries):
resp = MultiModalEmbedding.call(
model="tongyi-embedding-vision-flash-2026-03-06",
input=[{"image": f"data:image/jpeg;base64,{b64_str}"}],
)
if resp.status_code == 200:
results.append(resp.output["embeddings"][0]["embedding"])
break
if attempt < max_retries - 1:
time.sleep(2**attempt)
else:
results.append(None)
return pd.Series(results, index=base64_series.index)
df["embedding"] = df["img_base64"].mf.apply_chunk(
embed_images,
output_type="series",
dtype="object",
skip_infer=True,
batch_rows=5,
index=df["img_base64"].index,
api_key=DASHSCOPE_API_KEY,
)
df = df.execute()
print(df.fetch())
4. 将 Embedding 结果写入 MaxCompute#
md.to_odps_table(
md.DataFrame(df),
"image_embedding_result",
overwrite=True,
).execute()
print("Results written to table: image_embedding_result")
5. 资源清理#
print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")
常见问题排查#
问题 |
原因 |
解决方案 |
|---|---|---|
|
未开通 DPE |
联系管理员在项目中开通 DPE。 |
|
角色权限错误或缺失 |
检查 |
|
API Key 无效 |
在百炼控制台检查 API Key 状态。 |
Embedding 结果为 |
单张图片推理失败 |
检查图片完整性与 base64 编码结果。 |
向量维度不一致 |
混用了不同的 Embedding 模型 |
同一批次或输出表请使用同一 Embedding 模型。 |