.. _examples_openlake_multimodal_maxframe: Multimodal image pipeline with OpenLake, Object Tables, and MaxFrame ==================================================================== For multimodal analytics, **unstructured data at scale** needs both catalog metadata and a distributed runtime. **MaxCompute Object tables** capture OSS object metadata automatically, while **MaxFrame** runs Python transforms on that catalog. Combined with **OpenLake** (DLF + **Paimon**), you can resize images in parallel and land curated bytes in a lake table for retrieval, **AI Functions**, or downstream training. This walkthrough uses a **public image folder** on OSS: metadata is registered in an object table, a small **UDF** resizes each image to 150×150 BMP, and results are written to **Paimon** through a DLF-backed catalog. .. only:: html .. raw:: html
Run this tutorial on DataWorks Notebook
End-to-end flow --------------- .. code-block:: text OSS images → Object table (metadata) → MaxFrame SQL / DataFrame → UDF resize (distributed apply) → Paimon on DLF (OpenLake) → optional read-back for QA / retrieval prep Prerequisites ------------- MaxCompute ^^^^^^^^^^ - A **MaxCompute project** with the **three-layer model** enabled (project / **schema** / object). See `Schemas `__. - **Endpoints** for the API and Tunnel that match your network (VPC ``*.aliyun-inc.com`` or public ``*.aliyun.com``). See `Obtain endpoints `__. - **RAM or default credential chain** usable from notebooks (the sample uses ``CredentialProviderAccount`` with ``DefaultCredentialsProvider``). - Optional **external project** name if you read the Paimon table through a MaxCompute **external project** (``external_project_name`` in the sample). OpenLake: DLF + Paimon ^^^^^^^^^^^^^^^^^^^^^^^ - A **DLF catalog** in the same region you configure, with **AccessKey** pair or another supported credential for catalog APIs. See `Data Lake Formation `__. - **Paimon** Java/Python client libraries installed in the environment where you run the write step (for example ``paimon_python_java``, ``paimon_python_api``, ``pyarrow``). Package names and install steps follow your OpenLake onboarding guide. Configuration placeholders ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ .. code-block:: python project_name = "[your-project-name]" external_project_name = "[your-external-project-name]" region = "cn-shanghai" table_schema = "[your-schema]" object_table_name = "[your-object-table]" paimon_table_name = "[your-paimon-table]" endpoint = f"http://service.{region}.maxcompute.aliyun-inc.com/api" tunnel_endpoint = f"http://dt.{region}.maxcompute.aliyun-inc.com" dlf_region = "[dlf-region]" dlf_catalog_id = "[dlf-catalog-id]" dlf_database_name = "default" dlf_catalog_access_key_id = "[dlf-access-key-id]" dlf_catalog_access_key_secret = "[dlf-access-key-secret]" dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com" Step 1. Preview a public OSS object (optional) ------------------------------------------------ The sample reads a single JPEG from a **public** demo prefix using **anonymous** OSS auth. Replace ``bucket_name`` and ``object_key`` with your own layout when you move to private data. .. code-block:: python import io import matplotlib.pyplot as plt import oss2 from PIL import Image bucket_name = f"dataworks-notebook-{region}" object_key = "public-datasets/L1_Multimodal/cats-vs-sheeps/cat.1.jpg" bucket = oss2.Bucket( oss2.AnonymousAuth(), f"oss-{region}-internal.aliyuncs.com", bucket_name, ) image_data = bucket.get_object(object_key).read() image = plt.imread(io.BytesIO(image_data), format="jpeg") plt.imshow(image) plt.axis("off") plt.show() meta = bucket.head_object(object_key) content_length = meta.headers.get("Content-Length") print(f"Original size (bytes): {content_length}") image = Image.open(io.BytesIO(image_data)) width, height = image.size print(f"Original width: {width}px, height: {height}px") Step 2. Open a MaxFrame session ------------------------------- Session flags turn on **schema mode**, tune **object-table splitting** for concurrency, and disable features that conflict with this lab-style job. .. code-block:: python from alibabacloud_credentials import providers from maxframe import new_session, options import maxframe.dataframe as md from odps import ODPS from odps.accounts import CredentialProviderAccount options.sql.settings = { "odps.namespace.schema": "true", "odps.task.major.version": "default", "odps.sql.allow.namespace.schema": "true", "odps.sql.auto.merge.enabled": "false", "odps.sql.object.table.split.by.object.size.enabled": "true", "odps.sql.object.table.split.unit.kb": "1000", "odps.sql.offline.result.cache.enable": "false", "odps.sql.split.v2": "false", "odps.stage.mapper.split.size": "10", "odps.sql.type.system.odps2": "true", } options.sql.enable_mcqa = False options.sql.auto_use_common_image = False options.session.enable_schema = True account = CredentialProviderAccount(providers.DefaultCredentialsProvider()) o = ODPS( account=account, project=project_name, endpoint=endpoint, tunnel_endpoint=tunnel_endpoint, ) session = new_session(o) print(f"MaxFrame session id: {session.session_id}") print(session.get_logview_address()) Step 3. Create and refresh the object table ------------------------------------------- Point ``LOCATION`` at the OSS prefix that stores images. **Refresh metadata** before querying keys from MaxFrame. .. code-block:: python oss_prefix = ( "oss://oss-cn-shanghai-internal.aliyuncs.com/" "dataworks-dataset-cn-shanghai/public-datasets/L1_Multimodal/cats-vs-sheeps/" ) fq_ot = f"{project_name}.{table_schema}.{object_table_name}" o.execute_sql( f"CREATE OBJECT TABLE IF NOT EXISTS {fq_ot} LOCATION '{oss_prefix}'", hints=options.sql.settings, ) o.execute_sql( f"ALTER TABLE {fq_ot} REFRESH METADATA", hints=options.sql.settings, ) Step 4. Inspect object metadata and one image payload ------------------------------------------------------ .. code-block:: python ot_sample = ( md.read_odps_query(f"SELECT key, size, type, owner_id FROM {fq_ot}") .execute() .fetch() ) print(ot_sample.head(12)) df = md.read_odps_query( f"SELECT key, " f"base64(get_data_from_oss('{fq_ot}', key)) AS data " f"FROM {fq_ot} WHERE key = 'cat.1.jpg'", index_col="key", ) print(df.execute().fetch()) ``get_data_from_oss`` is a MaxCompute SQL helper that reads object bytes for a given key; the notebook encodes them as **base64** so the Python UDF can decode without mounting OSS inside the mapper. Step 5. Resize images in a MaxFrame UDF --------------------------------------- ``@with_python_requirements`` ships **Pillow**, **pandas**, and **cloudpickle** to workers. .. code-block:: python from maxframe.udf import with_python_requirements @with_python_requirements("pillow", "pandas", "cloudpickle") def apply_func(row): import base64 import io from PIL import Image src_image = Image.open(io.BytesIO(base64.b64decode(row.iloc[-1]))) canvas = Image.new(src_image.mode, (150, 150), (0, 0, 0)) scale = 150.0 / max(src_image.size) resized = src_image.resize( tuple(int(s * scale) for s in src_image.size) ) canvas.paste(resized, (0, 0)) sink = io.BytesIO() canvas.save(sink, "bmp") row = row.copy() row.iloc[-1] = base64.b64encode(sink.getvalue()).decode() return row Step 6. Run ``apply`` on MaxCompute ----------------------------------- .. code-block:: python apply_df = df.apply( apply_func, axis=1, dtypes=df.dtypes, output_type="dataframe", ) print(apply_df.execute().fetch()) Step 7. Write the result to Paimon (DLF catalog) ------------------------------------------------ Install the **Paimon** Python bindings and **PyArrow** in the client environment that runs this cell. The catalog uses the **``dlf-paimon``** metastore type. .. code-block:: python import pyarrow as pa from paimon_python_api import Schema from paimon_python_java import Catalog catalog_options = { "metastore": "dlf-paimon", "dlf.endpoint": dlf_endpoint, "dlf.region": dlf_region, "dlf.catalog.id": dlf_catalog_id, "dlf.catalog.accessKeyId": dlf_catalog_access_key_id, "dlf.catalog.accessKeySecret": dlf_catalog_access_key_secret, } catalog = Catalog.create(catalog_options) pandas_df = apply_df.to_pandas() record_batch = pa.RecordBatch.from_pandas(pandas_df) schema = Schema(record_batch.schema) fq_paimon = f"{dlf_database_name}.{paimon_table_name}" catalog.create_table(fq_paimon, schema, True) table = catalog.get_table(fq_paimon) write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() table_write.write_arrow_batch(record_batch) commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) table_write.close() table_commit.close() Step 8. Read back from MaxCompute and visualize ----------------------------------------------- The sample reads through an **external project** bound to the lake catalog. Adjust the fully qualified name to match your deployment. .. code-block:: python import base64 import io import matplotlib.pyplot as plt from PIL import Image sql = ( f"SELECT data FROM {external_project_name}." f"{dlf_database_name}.{paimon_table_name}" ) with o.execute_sql(sql, hints=options.sql.settings).open_reader( tunnel=False ) as reader: rec = next(reader) buf = io.BytesIO(base64.b64decode(rec[-1])) img = Image.open(buf) plt.imshow(img) plt.show() buf.seek(0) image = Image.open(buf) width, height = image.size raw_bytes = base64.b64decode(rec[-1]) print(f"Result size (bytes): {len(raw_bytes)}") print(f"Result width: {width}px, height: {height}px") Step 9. Close the session ------------------------- .. code-block:: python session.destroy() print("Session closed") Further reading --------------- - `View and use custom images `__ if you extend the UDF with heavier dependencies. - `Schemas and namespace mode `__ for three-layer project layout. Use the **Run this tutorial on DataWorks Notebook** button at the top for the gallery entry that bundles the notebook and template assets.