Multimodal image pipeline with OpenLake, Object Tables, and MaxFrame#

For multimodal analytics, unstructured data at scale needs both catalog metadata and a distributed runtime. MaxCompute Object tables capture OSS object metadata automatically, while MaxFrame runs Python transforms on that catalog. Combined with OpenLake (DLF + Paimon), you can resize images in parallel and land curated bytes in a lake table for retrieval, AI Functions, or downstream training.

This walkthrough uses a public image folder on OSS: metadata is registered in an object table, a small UDF resizes each image to 150×150 BMP, and results are written to Paimon through a DLF-backed catalog.

End-to-end flow#

OSS images  →  Object table (metadata)  →  MaxFrame SQL / DataFrame
     →  UDF resize (distributed apply)
     →  Paimon on DLF (OpenLake)
     →  optional read-back for QA / retrieval prep

Prerequisites#

MaxCompute#

  • A MaxCompute project with the three-layer model enabled (project / schema / object). See Schemas.

  • Endpoints for the API and Tunnel that match your network (VPC *.aliyun-inc.com or public *.aliyun.com). See Obtain endpoints.

  • RAM or default credential chain usable from notebooks (the sample uses CredentialProviderAccount with DefaultCredentialsProvider).

  • Optional external project name if you read the Paimon table through a MaxCompute external project (external_project_name in the sample).

OpenLake: DLF + Paimon#

  • A DLF catalog in the same region you configure, with AccessKey pair or another supported credential for catalog APIs. See Data Lake Formation.

  • Paimon Java/Python client libraries installed in the environment where you run the write step (for example paimon_python_java, paimon_python_api, pyarrow). Package names and install steps follow your OpenLake onboarding guide.

Configuration placeholders#

project_name = "[your-project-name]"
external_project_name = "[your-external-project-name]"
region = "cn-shanghai"
table_schema = "[your-schema]"
object_table_name = "[your-object-table]"
paimon_table_name = "[your-paimon-table]"

endpoint = f"http://service.{region}.maxcompute.aliyun-inc.com/api"
tunnel_endpoint = f"http://dt.{region}.maxcompute.aliyun-inc.com"

dlf_region = "[dlf-region]"
dlf_catalog_id = "[dlf-catalog-id]"
dlf_database_name = "default"
dlf_catalog_access_key_id = "[dlf-access-key-id]"
dlf_catalog_access_key_secret = "[dlf-access-key-secret]"

dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"

Step 1. Preview a public OSS object (optional)#

The sample reads a single JPEG from a public demo prefix using anonymous OSS auth. Replace bucket_name and object_key with your own layout when you move to private data.

import io

import matplotlib.pyplot as plt
import oss2
from PIL import Image

bucket_name = f"dataworks-notebook-{region}"
object_key = "public-datasets/L1_Multimodal/cats-vs-sheeps/cat.1.jpg"

bucket = oss2.Bucket(
    oss2.AnonymousAuth(),
    f"oss-{region}-internal.aliyuncs.com",
    bucket_name,
)

image_data = bucket.get_object(object_key).read()
image = plt.imread(io.BytesIO(image_data), format="jpeg")
plt.imshow(image)
plt.axis("off")
plt.show()

meta = bucket.head_object(object_key)
content_length = meta.headers.get("Content-Length")
print(f"Original size (bytes): {content_length}")

image = Image.open(io.BytesIO(image_data))
width, height = image.size
print(f"Original width: {width}px, height: {height}px")

Step 2. Open a MaxFrame session#

Session flags turn on schema mode, tune object-table splitting for concurrency, and disable features that conflict with this lab-style job.

from alibabacloud_credentials import providers
from maxframe import new_session, options
import maxframe.dataframe as md
from odps import ODPS
from odps.accounts import CredentialProviderAccount

options.sql.settings = {
    "odps.namespace.schema": "true",
    "odps.task.major.version": "default",
    "odps.sql.allow.namespace.schema": "true",
    "odps.sql.auto.merge.enabled": "false",
    "odps.sql.object.table.split.by.object.size.enabled": "true",
    "odps.sql.object.table.split.unit.kb": "1000",
    "odps.sql.offline.result.cache.enable": "false",
    "odps.sql.split.v2": "false",
    "odps.stage.mapper.split.size": "10",
    "odps.sql.type.system.odps2": "true",
}

options.sql.enable_mcqa = False
options.sql.auto_use_common_image = False
options.session.enable_schema = True

account = CredentialProviderAccount(providers.DefaultCredentialsProvider())
o = ODPS(
    account=account,
    project=project_name,
    endpoint=endpoint,
    tunnel_endpoint=tunnel_endpoint,
)

session = new_session(o)
print(f"MaxFrame session id: {session.session_id}")
print(session.get_logview_address())

Step 3. Create and refresh the object table#

Point LOCATION at the OSS prefix that stores images. Refresh metadata before querying keys from MaxFrame.

oss_prefix = (
    "oss://oss-cn-shanghai-internal.aliyuncs.com/"
    "dataworks-dataset-cn-shanghai/public-datasets/L1_Multimodal/cats-vs-sheeps/"
)
fq_ot = f"{project_name}.{table_schema}.{object_table_name}"

o.execute_sql(
    f"CREATE OBJECT TABLE IF NOT EXISTS {fq_ot} LOCATION '{oss_prefix}'",
    hints=options.sql.settings,
)
o.execute_sql(
    f"ALTER TABLE {fq_ot} REFRESH METADATA",
    hints=options.sql.settings,
)

Step 4. Inspect object metadata and one image payload#

ot_sample = (
    md.read_odps_query(f"SELECT key, size, type, owner_id FROM {fq_ot}")
    .execute()
    .fetch()
)
print(ot_sample.head(12))

df = md.read_odps_query(
    f"SELECT key, "
    f"base64(get_data_from_oss('{fq_ot}', key)) AS data "
    f"FROM {fq_ot} WHERE key = 'cat.1.jpg'",
    index_col="key",
)
print(df.execute().fetch())

get_data_from_oss is a MaxCompute SQL helper that reads object bytes for a given key; the notebook encodes them as base64 so the Python UDF can decode without mounting OSS inside the mapper.

Step 5. Resize images in a MaxFrame UDF#

@with_python_requirements ships Pillow, pandas, and cloudpickle to workers.

from maxframe.udf import with_python_requirements


@with_python_requirements("pillow", "pandas", "cloudpickle")
def apply_func(row):
    import base64
    import io

    from PIL import Image

    src_image = Image.open(io.BytesIO(base64.b64decode(row.iloc[-1])))
    canvas = Image.new(src_image.mode, (150, 150), (0, 0, 0))
    scale = 150.0 / max(src_image.size)
    resized = src_image.resize(
        tuple(int(s * scale) for s in src_image.size)
    )
    canvas.paste(resized, (0, 0))
    sink = io.BytesIO()
    canvas.save(sink, "bmp")
    row = row.copy()
    row.iloc[-1] = base64.b64encode(sink.getvalue()).decode()
    return row

Step 6. Run apply on MaxCompute#

apply_df = df.apply(
    apply_func,
    axis=1,
    dtypes=df.dtypes,
    output_type="dataframe",
)
print(apply_df.execute().fetch())

Step 7. Write the result to Paimon (DLF catalog)#

Install the Paimon Python bindings and PyArrow in the client environment that runs this cell. The catalog uses the ``dlf-paimon`` metastore type.

import pyarrow as pa
from paimon_python_api import Schema
from paimon_python_java import Catalog

catalog_options = {
    "metastore": "dlf-paimon",
    "dlf.endpoint": dlf_endpoint,
    "dlf.region": dlf_region,
    "dlf.catalog.id": dlf_catalog_id,
    "dlf.catalog.accessKeyId": dlf_catalog_access_key_id,
    "dlf.catalog.accessKeySecret": dlf_catalog_access_key_secret,
}
catalog = Catalog.create(catalog_options)

pandas_df = apply_df.to_pandas()
record_batch = pa.RecordBatch.from_pandas(pandas_df)
schema = Schema(record_batch.schema)

fq_paimon = f"{dlf_database_name}.{paimon_table_name}"
catalog.create_table(fq_paimon, schema, True)
table = catalog.get_table(fq_paimon)

write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()

table_write.write_arrow_batch(record_batch)
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)

table_write.close()
table_commit.close()

Step 8. Read back from MaxCompute and visualize#

The sample reads through an external project bound to the lake catalog. Adjust the fully qualified name to match your deployment.

import base64
import io

import matplotlib.pyplot as plt
from PIL import Image

sql = (
    f"SELECT data FROM {external_project_name}."
    f"{dlf_database_name}.{paimon_table_name}"
)
with o.execute_sql(sql, hints=options.sql.settings).open_reader(
    tunnel=False
) as reader:
    rec = next(reader)

buf = io.BytesIO(base64.b64decode(rec[-1]))
img = Image.open(buf)
plt.imshow(img)
plt.show()

buf.seek(0)
image = Image.open(buf)
width, height = image.size
raw_bytes = base64.b64decode(rec[-1])
print(f"Result size (bytes): {len(raw_bytes)}")
print(f"Result width: {width}px, height: {height}px")

Step 9. Close the session#

session.destroy()
print("Session closed")

Further reading#

Use the Run this tutorial on DataWorks Notebook button at the top for the gallery entry that bundles the notebook and template assets.