使用 OpenLake、Object Table 和 MaxFrame 构建多模态图像流水线#
在多模态分析场景中,大规模非结构化数据 既需要目录元数据,也需要分布式运行时。MaxCompute Object Table 会自动采集 OSS 对象元数据,而 MaxFrame 可以基于该目录运行 Python 转换。结合 OpenLake (DLF + Paimon),你可以并行缩放图像,并将处理后的字节写入湖表,用于检索、AI Functions 或下游训练。
本教程使用 OSS 上的一个 公共图片目录:先将元数据注册到 Object Table,再通过一个小型 UDF 将每张图片缩放为 150×150 BMP,最后通过 DLF 支持的 catalog 将结果写入 Paimon。
端到端流程#
OSS images → Object table (metadata) → MaxFrame SQL / DataFrame
→ UDF resize (distributed apply)
→ Paimon on DLF (OpenLake)
→ optional read-back for QA / retrieval prep
前提条件#
MaxCompute#
一个已启用 三层模型 (project / schema / object)的 MaxCompute 项目。参见 Schemas。
与你的网络匹配的 API 和 Tunnel Endpoint (VPC
*.aliyun-inc.com或公网*.aliyun.com)。参见 获取 Endpoint。Notebook 中可用的 RAM 凭证或默认凭证链 (示例使用
CredentialProviderAccount和DefaultCredentialsProvider)。如果通过 MaxCompute 外部项目 读取 Paimon 表,可准备可选的 external project 名称(示例中的
external_project_name)。
OpenLake:DLF + Paimon#
在同一区域配置一个 DLF catalog,并准备用于 catalog API 的 AccessKey 或其他受支持凭证。参见 Data Lake Formation。
在执行写入步骤的环境中安装 Paimon Java/Python 客户端库(例如
paimon_python_java、paimon_python_api、pyarrow)。包名和安装步骤请以你的 OpenLake 接入指南为准。
配置占位符#
project_name = "[your-project-name]"
external_project_name = "[your-external-project-name]"
region = "cn-shanghai"
table_schema = "[your-schema]"
object_table_name = "[your-object-table]"
paimon_table_name = "[your-paimon-table]"
endpoint = f"http://service.{region}.maxcompute.aliyun-inc.com/api"
tunnel_endpoint = f"http://dt.{region}.maxcompute.aliyun-inc.com"
dlf_region = "[dlf-region]"
dlf_catalog_id = "[dlf-catalog-id]"
dlf_database_name = "default"
dlf_catalog_access_key_id = "[dlf-access-key-id]"
dlf_catalog_access_key_secret = "[dlf-access-key-secret]"
dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"
步骤 1:预览公共 OSS 对象(可选)#
示例使用 OSS 匿名 访问从一个 公共 演示前缀读取单张 JPEG。迁移到私有数据时,请将 bucket_name 和 object_key 替换为你自己的目录结构。
import io
import matplotlib.pyplot as plt
import oss2
from PIL import Image
bucket_name = f"dataworks-notebook-{region}"
object_key = "public-datasets/L1_Multimodal/cats-vs-sheeps/cat.1.jpg"
bucket = oss2.Bucket(
oss2.AnonymousAuth(),
f"oss-{region}-internal.aliyuncs.com",
bucket_name,
)
image_data = bucket.get_object(object_key).read()
image = plt.imread(io.BytesIO(image_data), format="jpeg")
plt.imshow(image)
plt.axis("off")
plt.show()
meta = bucket.head_object(object_key)
content_length = meta.headers.get("Content-Length")
print(f"Original size (bytes): {content_length}")
image = Image.open(io.BytesIO(image_data))
width, height = image.size
print(f"Original width: {width}px, height: {height}px")
步骤 2:打开 MaxFrame Session#
Session 配置会开启 schema 模式,调整 Object Table 切分 以提升并发,并关闭与该实验式作业冲突的功能。
from alibabacloud_credentials import providers
from maxframe import new_session, options
import maxframe.dataframe as md
from odps import ODPS
from odps.accounts import CredentialProviderAccount
options.sql.settings = {
"odps.namespace.schema": "true",
"odps.task.major.version": "default",
"odps.sql.allow.namespace.schema": "true",
"odps.sql.auto.merge.enabled": "false",
"odps.sql.object.table.split.by.object.size.enabled": "true",
"odps.sql.object.table.split.unit.kb": "1000",
"odps.sql.offline.result.cache.enable": "false",
"odps.sql.split.v2": "false",
"odps.stage.mapper.split.size": "10",
"odps.sql.type.system.odps2": "true",
}
options.sql.enable_mcqa = False
options.sql.auto_use_common_image = False
options.session.enable_schema = True
account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
account=account,
project=project_name,
endpoint=endpoint,
tunnel_endpoint=tunnel_endpoint,
)
session = new_session(o)
print(f"MaxFrame session id: {session.session_id}")
print(session.get_logview_address())
步骤 3:创建并刷新 Object Table#
将 LOCATION 指向存放图片的 OSS 前缀。在 MaxFrame 中查询 key 之前,需要先 刷新元数据。
oss_prefix = (
"oss://oss-cn-shanghai-internal.aliyuncs.com/"
"dataworks-dataset-cn-shanghai/public-datasets/L1_Multimodal/cats-vs-sheeps/"
)
fq_ot = f"{project_name}.{table_schema}.{object_table_name}"
o.execute_sql(
f"CREATE OBJECT TABLE IF NOT EXISTS {fq_ot} LOCATION '{oss_prefix}'",
hints=options.sql.settings,
)
o.execute_sql(
f"ALTER TABLE {fq_ot} REFRESH METADATA",
hints=options.sql.settings,
)
步骤 4:查看对象元数据和一张图片内容#
ot_sample = (
md.read_odps_query(f"SELECT key, size, type, owner_id FROM {fq_ot}")
.execute()
.fetch()
)
print(ot_sample.head(12))
df = md.read_odps_query(
f"SELECT key, "
f"base64(get_data_from_oss('{fq_ot}', key)) AS data "
f"FROM {fq_ot} WHERE key = 'cat.1.jpg'",
index_col="key",
)
print(df.execute().fetch())
get_data_from_oss 是一个 MaxCompute SQL 辅助函数,用于按 key 读取对象字节;Notebook 将其编码为 base64,这样 Python UDF 无需在 mapper 内挂载 OSS 也能解码。
步骤 5:在 MaxFrame UDF 中缩放图片#
@with_python_requirements 会将 Pillow、pandas 和 cloudpickle 分发到 worker。
from maxframe.udf import with_python_requirements
@with_python_requirements("pillow", "pandas", "cloudpickle")
def apply_func(row):
import base64
import io
from PIL import Image
src_image = Image.open(io.BytesIO(base64.b64decode(row.iloc[-1])))
canvas = Image.new(src_image.mode, (150, 150), (0, 0, 0))
scale = 150.0 / max(src_image.size)
resized = src_image.resize(
tuple(int(s * scale) for s in src_image.size)
)
canvas.paste(resized, (0, 0))
sink = io.BytesIO()
canvas.save(sink, "bmp")
row = row.copy()
row.iloc[-1] = base64.b64encode(sink.getvalue()).decode()
return row
步骤 6:在 MaxCompute 上运行 apply#
apply_df = df.apply(
apply_func,
axis=1,
dtypes=df.dtypes,
output_type="dataframe",
)
print(apply_df.execute().fetch())
步骤 7:将结果写入 Paimon(DLF catalog)#
在运行该 cell 的客户端环境中安装 Paimon Python 绑定和 PyArrow。该 catalog 使用 ``dlf-paimon`` metastore 类型。
import pyarrow as pa
from paimon_python_api import Schema
from paimon_python_java import Catalog
catalog_options = {
"metastore": "dlf-paimon",
"dlf.endpoint": dlf_endpoint,
"dlf.region": dlf_region,
"dlf.catalog.id": dlf_catalog_id,
"dlf.catalog.accessKeyId": dlf_catalog_access_key_id,
"dlf.catalog.accessKeySecret": dlf_catalog_access_key_secret,
}
catalog = Catalog.create(catalog_options)
pandas_df = apply_df.to_pandas()
record_batch = pa.RecordBatch.from_pandas(pandas_df)
schema = Schema(record_batch.schema)
fq_paimon = f"{dlf_database_name}.{paimon_table_name}"
catalog.create_table(fq_paimon, schema, True)
table = catalog.get_table(fq_paimon)
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
table_write.write_arrow_batch(record_batch)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
table_write.close()
table_commit.close()
步骤 8:从 MaxCompute 读回并可视化#
示例通过绑定到 lake catalog 的 external project 读取数据。请根据你的部署调整全限定名称。
import base64
import io
import matplotlib.pyplot as plt
from PIL import Image
sql = (
f"SELECT data FROM {external_project_name}."
f"{dlf_database_name}.{paimon_table_name}"
)
with o.execute_sql(sql, hints=options.sql.settings).open_reader(
tunnel=False
) as reader:
rec = next(reader)
buf = io.BytesIO(base64.b64decode(rec[-1]))
img = Image.open(buf)
plt.imshow(img)
plt.show()
buf.seek(0)
image = Image.open(buf)
width, height = image.size
raw_bytes = base64.b64decode(rec[-1])
print(f"Result size (bytes): {len(raw_bytes)}")
print(f"Result width: {width}px, height: {height}px")
步骤 9:关闭 Session#
session.destroy()
print("Session closed")
延伸阅读#
如果需要为 UDF 扩展更重的依赖,请参见 查看和使用自定义镜像。
三层项目结构请参见 Schemas 与 namespace 模式。
可使用页面顶部的 Run this tutorial on DataWorks Notebook 按钮打开图库条目,其中包含 Notebook 和模板资产。