使用 OpenLake、Object Table 和 MaxFrame 构建多模态图像流水线#

在多模态分析场景中,大规模非结构化数据 既需要目录元数据,也需要分布式运行时。MaxCompute Object Table 会自动采集 OSS 对象元数据,而 MaxFrame 可以基于该目录运行 Python 转换。结合 OpenLake (DLF + Paimon),你可以并行缩放图像,并将处理后的字节写入湖表,用于检索、AI Functions 或下游训练。

本教程使用 OSS 上的一个 公共图片目录:先将元数据注册到 Object Table,再通过一个小型 UDF 将每张图片缩放为 150×150 BMP,最后通过 DLF 支持的 catalog 将结果写入 Paimon

端到端流程#

OSS images  →  Object table (metadata)  →  MaxFrame SQL / DataFrame
     →  UDF resize (distributed apply)
     →  Paimon on DLF (OpenLake)
     →  optional read-back for QA / retrieval prep

前提条件#

MaxCompute#

  • 一个已启用 三层模型 (project / schema / object)的 MaxCompute 项目。参见 Schemas

  • 与你的网络匹配的 API 和 Tunnel Endpoint (VPC *.aliyun-inc.com 或公网 *.aliyun.com)。参见 获取 Endpoint

  • Notebook 中可用的 RAM 凭证或默认凭证链 (示例使用 CredentialProviderAccountDefaultCredentialsProvider)。

  • 如果通过 MaxCompute 外部项目 读取 Paimon 表,可准备可选的 external project 名称(示例中的 external_project_name)。

OpenLake:DLF + Paimon#

  • 在同一区域配置一个 DLF catalog,并准备用于 catalog API 的 AccessKey 或其他受支持凭证。参见 Data Lake Formation

  • 在执行写入步骤的环境中安装 Paimon Java/Python 客户端库(例如 paimon_python_javapaimon_python_apipyarrow)。包名和安装步骤请以你的 OpenLake 接入指南为准。

配置占位符#

project_name = "[your-project-name]"
external_project_name = "[your-external-project-name]"
region = "cn-shanghai"
table_schema = "[your-schema]"
object_table_name = "[your-object-table]"
paimon_table_name = "[your-paimon-table]"

endpoint = f"http://service.{region}.maxcompute.aliyun-inc.com/api"
tunnel_endpoint = f"http://dt.{region}.maxcompute.aliyun-inc.com"

dlf_region = "[dlf-region]"
dlf_catalog_id = "[dlf-catalog-id]"
dlf_database_name = "default"
dlf_catalog_access_key_id = "[dlf-access-key-id]"
dlf_catalog_access_key_secret = "[dlf-access-key-secret]"

dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"

步骤 1:预览公共 OSS 对象(可选)#

示例使用 OSS 匿名 访问从一个 公共 演示前缀读取单张 JPEG。迁移到私有数据时,请将 bucket_nameobject_key 替换为你自己的目录结构。

import io

import matplotlib.pyplot as plt
import oss2
from PIL import Image

bucket_name = f"dataworks-notebook-{region}"
object_key = "public-datasets/L1_Multimodal/cats-vs-sheeps/cat.1.jpg"

bucket = oss2.Bucket(
    oss2.AnonymousAuth(),
    f"oss-{region}-internal.aliyuncs.com",
    bucket_name,
)

image_data = bucket.get_object(object_key).read()
image = plt.imread(io.BytesIO(image_data), format="jpeg")
plt.imshow(image)
plt.axis("off")
plt.show()

meta = bucket.head_object(object_key)
content_length = meta.headers.get("Content-Length")
print(f"Original size (bytes): {content_length}")

image = Image.open(io.BytesIO(image_data))
width, height = image.size
print(f"Original width: {width}px, height: {height}px")

步骤 2:打开 MaxFrame Session#

Session 配置会开启 schema 模式,调整 Object Table 切分 以提升并发,并关闭与该实验式作业冲突的功能。

from alibabacloud_credentials import providers
from maxframe import new_session, options
import maxframe.dataframe as md
from odps import ODPS
from odps.accounts import CredentialProviderAccount

options.sql.settings = {
    "odps.namespace.schema": "true",
    "odps.task.major.version": "default",
    "odps.sql.allow.namespace.schema": "true",
    "odps.sql.auto.merge.enabled": "false",
    "odps.sql.object.table.split.by.object.size.enabled": "true",
    "odps.sql.object.table.split.unit.kb": "1000",
    "odps.sql.offline.result.cache.enable": "false",
    "odps.sql.split.v2": "false",
    "odps.stage.mapper.split.size": "10",
    "odps.sql.type.system.odps2": "true",
}

options.sql.enable_mcqa = False
options.sql.auto_use_common_image = False
options.session.enable_schema = True

account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    project=project_name,
    endpoint=endpoint,
    tunnel_endpoint=tunnel_endpoint,
)

session = new_session(o)
print(f"MaxFrame session id: {session.session_id}")
print(session.get_logview_address())

步骤 3:创建并刷新 Object Table#

LOCATION 指向存放图片的 OSS 前缀。在 MaxFrame 中查询 key 之前,需要先 刷新元数据

oss_prefix = (
    "oss://oss-cn-shanghai-internal.aliyuncs.com/"
    "dataworks-dataset-cn-shanghai/public-datasets/L1_Multimodal/cats-vs-sheeps/"
)
fq_ot = f"{project_name}.{table_schema}.{object_table_name}"

o.execute_sql(
    f"CREATE OBJECT TABLE IF NOT EXISTS {fq_ot} LOCATION '{oss_prefix}'",
    hints=options.sql.settings,
)
o.execute_sql(
    f"ALTER TABLE {fq_ot} REFRESH METADATA",
    hints=options.sql.settings,
)

步骤 4:查看对象元数据和一张图片内容#

ot_sample = (
    md.read_odps_query(f"SELECT key, size, type, owner_id FROM {fq_ot}")
    .execute()
    .fetch()
)
print(ot_sample.head(12))

df = md.read_odps_query(
    f"SELECT key, "
    f"base64(get_data_from_oss('{fq_ot}', key)) AS data "
    f"FROM {fq_ot} WHERE key = 'cat.1.jpg'",
    index_col="key",
)
print(df.execute().fetch())

get_data_from_oss 是一个 MaxCompute SQL 辅助函数,用于按 key 读取对象字节;Notebook 将其编码为 base64,这样 Python UDF 无需在 mapper 内挂载 OSS 也能解码。

步骤 5:在 MaxFrame UDF 中缩放图片#

@with_python_requirements 会将 Pillowpandascloudpickle 分发到 worker。

from maxframe.udf import with_python_requirements


@with_python_requirements("pillow", "pandas", "cloudpickle")
def apply_func(row):
    import base64
    import io

    from PIL import Image

    src_image = Image.open(io.BytesIO(base64.b64decode(row.iloc[-1])))
    canvas = Image.new(src_image.mode, (150, 150), (0, 0, 0))
    scale = 150.0 / max(src_image.size)
    resized = src_image.resize(
        tuple(int(s * scale) for s in src_image.size)
    )
    canvas.paste(resized, (0, 0))
    sink = io.BytesIO()
    canvas.save(sink, "bmp")
    row = row.copy()
    row.iloc[-1] = base64.b64encode(sink.getvalue()).decode()
    return row

步骤 6:在 MaxCompute 上运行 apply#

apply_df = df.apply(
    apply_func,
    axis=1,
    dtypes=df.dtypes,
    output_type="dataframe",
)
print(apply_df.execute().fetch())

步骤 7:将结果写入 Paimon(DLF catalog)#

在运行该 cell 的客户端环境中安装 Paimon Python 绑定和 PyArrow。该 catalog 使用 ``dlf-paimon`` metastore 类型。

import pyarrow as pa
from paimon_python_api import Schema
from paimon_python_java import Catalog

catalog_options = {
    "metastore": "dlf-paimon",
    "dlf.endpoint": dlf_endpoint,
    "dlf.region": dlf_region,
    "dlf.catalog.id": dlf_catalog_id,
    "dlf.catalog.accessKeyId": dlf_catalog_access_key_id,
    "dlf.catalog.accessKeySecret": dlf_catalog_access_key_secret,
}
catalog = Catalog.create(catalog_options)

pandas_df = apply_df.to_pandas()
record_batch = pa.RecordBatch.from_pandas(pandas_df)
schema = Schema(record_batch.schema)

fq_paimon = f"{dlf_database_name}.{paimon_table_name}"
catalog.create_table(fq_paimon, schema, True)
table = catalog.get_table(fq_paimon)

write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow_batch(record_batch)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)

table_write.close()
table_commit.close()

步骤 8:从 MaxCompute 读回并可视化#

示例通过绑定到 lake catalog 的 external project 读取数据。请根据你的部署调整全限定名称。

import base64
import io

import matplotlib.pyplot as plt
from PIL import Image

sql = (
    f"SELECT data FROM {external_project_name}."
    f"{dlf_database_name}.{paimon_table_name}"
)
with o.execute_sql(sql, hints=options.sql.settings).open_reader(
    tunnel=False
) as reader:
    rec = next(reader)

buf = io.BytesIO(base64.b64decode(rec[-1]))
img = Image.open(buf)
plt.imshow(img)
plt.show()

buf.seek(0)
image = Image.open(buf)
width, height = image.size
raw_bytes = base64.b64decode(rec[-1])
print(f"Result size (bytes): {len(raw_bytes)}")
print(f"Result width: {width}px, height: {height}px")

步骤 9:关闭 Session#

session.destroy()
print("Session closed")

延伸阅读#

可使用页面顶部的 Run this tutorial on DataWorks Notebook 按钮打开图库条目,其中包含 Notebook 和模板资产。