视频处理全流程:抽帧 -> 打标 -> 向量化#

Available at MaxFrame 2.6.0

背景信息#

自动驾驶车辆在研发和运营过程中会持续采集海量摄像头视频数据。这些数据覆盖不同道路类型、交通流、天气光照和复杂事件,是模型训练、场景挖掘、数据闭环和效果评测的重要基础。

然而,原始视频通常缺乏结构化标签,人工筛选和整理成本高,导致高价值样本发现难、数据复用效率低。为提升海量视频数据的管理和利用效率,需要对视频进行帧级理解与标准化加工:

  1. 视频抽帧:通过 FFmpeg 按指定帧率抽取关键帧图片。

  2. 图片打标:调用百炼多模态大模型自动生成场景标签和语义描述。

  3. 向量化:提取 Embedding 向量,形成可用于检索、聚类、样本筛选和分析的统一特征底座。

本方案依托 MaxFrame 的分布式处理能力,实现视频数据的端到端批量处理,帮助将非结构化视频数据转化为可检索、可分析、可复用的数据资产。

适用场景#

  • 海量视频数据自动打标与管理。

  • 相似场景检索与样本召回。

  • 异常事件与长尾场景发现。

  • 数据聚类分析与分布评估。

  • 训练、评测与数据闭环支撑。

方案核心流程#

视频理解与向量化处理流程

前提条件#

#

条件

说明

1

开通 MaxCompute

需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。

2

开通 DPE 引擎

apply_chunk 阶段在 DPE 上执行。

3

上传视频至 OSS

原始视频已存储在目标 OSS Bucket 中。

4

OSS RAM 角色授权

配置用于 OSS 挂载访问的 Role ARN。

5

百炼 API Key

用于多模态打标与向量化模型调用。

6

包含 FFmpeg 的自定义镜像

阶段一抽帧需要使用包含 FFmpeg 的 MaxCompute 自定义镜像。该镜像必须和 MaxCompute 项目处于同一地域。

7

MaxFrame SDK 版本

请使用 MaxFrame SDK 2.6.0 及以上版本(pip install maxframe>=2.6.0)。

自定义镜像#

为什么需要自定义镜像?#

默认 DPE 运行时仅包含 Python,不包含 FFmpeg;阶段一抽帧依赖 FFmpeg 可执行文件。快速验证时,可以复用同地域已经验证过的 FFmpeg 镜像。生产环境建议自行构建并发布与 MaxCompute 项目同地域的镜像,然后将 odps.session.image 设置为该镜像名称。

Dockerfile 示例:

FROM registry.cn-zhangjiakou.aliyuncs.com/maxcompute_image/ubuntu_20.04:latest

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
    ffmpeg \
    && rm -rf /var/lib/apt/lists/*

COPY ossfs2_2.0.3.1_linux_x86_64.deb /tmp/
RUN apt install /tmp/ossfs2_2.0.3.1_linux_x86_64.deb

WORKDIR /workspace
ENV MF_PYTHON_EXECUTABLE="/usr/ali/python3.11.7/bin/python3"

环境准备与会话创建#

import glob
import json
import math
import os
import subprocess

import numpy as np
import pandas as pd
import maxframe.dataframe as md
from maxframe.config import options
from maxframe.session import new_session
from maxframe.udf import with_fs_mount, with_python_requirements, with_running_options
from maxframe.dataframe.utils import parse_index
from odps import ODPS

options.sql.settings = {"odps.session.image": "<your_ffmpeg_image_name>"}
options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}
options.dpe.settings = {
    "substep.internal_network_whitelist": [
        "intranet-cn-beijing.dashscope.aliyuncs.com:443",
    ],
}

o = ODPS(
    access_id="<your_access_id>",
    secret_access_key="<your_secret_access_key>",
    project="<your_maxcompute_project>",
    endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView    : {session.get_logview_address()}")

全局参数配置:

OSS_ENDPOINT = "<your_oss_endpoint>"  # for example: oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET = "<your_oss_bucket>"
OSS_PATH = "<your_video_prefix>/"
ROLE_ARN = "acs:ram::<account_id>:role/<your_role_name>"

DASHSCOPE_API_KEY = "<your_dashscope_api_key>"
LABEL_MODEL = "qwen3-vl-plus"
EMBEDDING_MODEL = "qwen3-vl-embedding"
EMBEDDING_DIM = 1024

LABEL_PROMPT = (
    "This image is captured from an autonomous driving scenario. "
    "Identify factors that may affect driving, including lane quality, signs, "
    "road conditions, obstacles, poor weather, poor lighting, complex traffic "
    "patterns, and unusual events. Provide detailed scene factors only."
)

提交任务前,请确认挂载的 OSS path 正确。例如,如果你的 OSS 地址是 oss://oss-cn-hangzhou.aliyuncs.com/jingxuan-oss-test-hz/,则设置 OSS_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"OSS_BUCKET = "jingxuan-oss-test-hz",并将 OSS_PATH 设为 "" 或实际存放视频的子目录。

阶段一:视频抽帧#

@with_running_options(engine="dpe", cpu=1, memory=4)
@with_fs_mount(
    path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
    mount_path="/mnt/data",
    storage_options={"role_arn": ROLE_ARN},
)
def extract_frame(batch_df, fps=2, quality=2, timeout=300):
    video_extensions = (".mp4", ".avi", ".mov")
    frame_columns = ["image_oss_bucket", "image_oss_path", "image_oss_name", "size"]

    video_files = []
    for root, _, files in os.walk("/mnt/data"):
        for f in files:
            if f.lower().endswith(video_extensions):
                video_files.append(os.path.join(root, f))

    all_results = []
    for video_path in video_files:
        video_name_no_ext = os.path.splitext(os.path.basename(video_path))[0]
        output_dir = os.path.join("/mnt/data", "output_frames", video_name_no_ext)
        os.makedirs(output_dir, exist_ok=True)

        try:
            cmd = [
                "ffmpeg", "-i", video_path,
                "-vf", f"fps={fps}",
                "-q:v", str(quality),
                "-f", "image2",
                os.path.join(output_dir, "%04d.jpg"),
            ]
            process = subprocess.run(
                cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, timeout=timeout
            )
            if process.returncode != 0:
                continue
        except Exception:
            continue

        for frame in sorted(glob.glob(os.path.join(output_dir, "*.jpg"))):
            rel_dir = output_dir.replace("/mnt/data/", "").rstrip("/") + "/"
            all_results.append([
                OSS_BUCKET,
                rel_dir,
                os.path.basename(frame),
                os.path.getsize(frame),
            ])

    if not all_results:
        return pd.DataFrame(columns=frame_columns)
    return pd.DataFrame(all_results, columns=frame_columns)

seed_df = md.DataFrame(pd.DataFrame({"trigger": [1]}))
frame_result = seed_df.mf.apply_chunk(
    extract_frame,
    output_type="dataframe",
    dtypes={
        "image_oss_bucket": np.dtype("str"),
        "image_oss_path": np.dtype("str"),
        "image_oss_name": np.dtype("str"),
        "size": np.dtype("int"),
    },
    skip_infer=True,
    index=parse_index(pd.Index([], dtype=np.int64)),
)

md.to_odps_table(frame_result, "mf_video_frame_result", overwrite=True, index=False).execute()

阶段二:图片打标#

备注

在大规模生产场景中,建议申请更高 API 并发配额以避免 429/403 限流。

@with_fs_mount(
    path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
    mount_path="/mnt/data",
    storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("dashscope>=1.24.6")
@with_running_options(engine="dpe", cpu=1, memory=4)
def labeling_chunk(chunk, api_key=None, model=LABEL_MODEL, prompt=None, max_tokens=250):
    import base64
    from http import HTTPStatus
    import dashscope

    dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"
    _api_key = api_key or DASHSCOPE_API_KEY
    _prompt = prompt or LABEL_PROMPT

    rows = []
    for _, row in chunk.iterrows():
        try:
            img_path = f"/mnt/data/{row['image_oss_path']}{row['image_oss_name']}"
            suffix = os.path.splitext(img_path)[1].lower()
            mime = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png"}.get(suffix, "image/jpeg")
            with open(img_path, "rb") as f:
                image_content = f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"

            resp = dashscope.MultiModalConversation.call(
                api_key=_api_key,
                model=model,
                messages=[{"role": "user", "content": [{"image": image_content}, {"text": _prompt}]}],
                enable_thinking=False,
                max_tokens=max_tokens,
            )
            if resp.status_code != HTTPStatus.OK:
                raise RuntimeError(f"[{resp.status_code}] {resp.message}")

            label = resp.output["choices"][0]["message"]["content"][0]["text"].strip()
            rows.append(
                {
                    "image_oss_path": row["image_oss_path"],
                    "image_oss_name": row["image_oss_name"],
                    "label": label,
                    "status": "succeed",
                    "error_stage": "",
                    "error_msg": "",
                }
            )
        except Exception as e:
            rows.append(
                {
                    "image_oss_path": row["image_oss_path"],
                    "image_oss_name": row["image_oss_name"],
                    "label": "",
                    "status": "failed",
                    "error_stage": "label",
                    "error_msg": str(e),
                }
            )
    return pd.DataFrame(rows)

label_result = frame_result[["image_oss_path", "image_oss_name", "size"]].mf.apply_chunk(
    labeling_chunk,
    output_type="dataframe",
    dtypes={
        "image_oss_path": "object",
        "image_oss_name": "object",
        "label": "object",
        "status": "object",
        "error_stage": "object",
        "error_msg": "object",
    },
)
md.to_odps_table(label_result, "mf_video_label_result", overwrite=True).execute()

阶段三:向量化#

@with_fs_mount(
    path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
    mount_path="/mnt/data",
    storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("dashscope>=1.24.6")
@with_running_options(engine="dpe", cpu=1, memory=4)
def embedding_chunk(chunk, api_key=None, model=EMBEDDING_MODEL, dimension=EMBEDDING_DIM):
    import base64
    from http import HTTPStatus
    import dashscope

    dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"
    _api_key = api_key or DASHSCOPE_API_KEY

    def _embed(input_data):
        resp = dashscope.MultiModalEmbedding.call(
            api_key=_api_key, model=model, input=[input_data], dimension=dimension
        )
        if resp.status_code != HTTPStatus.OK:
            raise RuntimeError(f"[{resp.status_code}] {resp.message}")
        return resp.output["embeddings"][0]["embedding"]

    def _validate(vector):
        vals = [float(v) for v in vector]
        if len(vals) != dimension:
            raise ValueError(f"dimension mismatch: expected {dimension}, got {len(vals)}")
        return vals

    rows = []
    for _, row in chunk.iterrows():
        try:
            img_path = f"/mnt/data/{row['image_oss_path']}{row['image_oss_name']}"
            suffix = os.path.splitext(img_path)[1].lower()
            mime = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png"}.get(suffix, "image/jpeg")
            with open(img_path, "rb") as f:
                image_content = f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"

            label_emb = _validate(_embed({"text": row["label"]}))
            image_emb = _validate(_embed({"image": image_content}))
            rows.append(
                {
                    "image_oss_path": row["image_oss_path"],
                    "image_oss_name": row["image_oss_name"],
                    "label_embedding": json.dumps(label_emb, separators=(",", ":")),
                    "image_embedding": json.dumps(image_emb, separators=(",", ":")),
                    "status": "succeed",
                    "error_stage": "",
                    "error_msg": "",
                }
            )
        except Exception as e:
            rows.append(
                {
                    "image_oss_path": row["image_oss_path"],
                    "image_oss_name": row["image_oss_name"],
                    "label_embedding": "",
                    "image_embedding": "",
                    "status": "failed",
                    "error_stage": "embedding",
                    "error_msg": str(e),
                }
            )
    return pd.DataFrame(rows)

emb_df = label_result[label_result["status"] == "succeed"][
    ["image_oss_path", "image_oss_name", "label"]
]
emb_result = emb_df.mf.apply_chunk(
    embedding_chunk,
    output_type="dataframe",
    dtypes={
        "image_oss_path": "object",
        "image_oss_name": "object",
        "label_embedding": "object",
        "image_embedding": "object",
        "status": "object",
        "error_stage": "object",
        "error_msg": "object",
    },
)
md.to_odps_table(emb_result, "mf_video_embedding_result", overwrite=True).execute()

资源清理#

print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")