视频处理全流程:抽帧 -> 打标 -> 向量化#

Available at MaxFrame 2.7.1

背景信息#

自动驾驶车辆在研发和运营过程中会持续采集海量摄像头视频数据。这些数据覆盖不同道路类型、交通流、天气光照和复杂事件,是模型训练、场景挖掘、数据闭环和效果评测的重要基础。

然而,原始视频通常缺乏结构化标签,人工筛选和整理成本高,导致高价值样本发现难、数据复用效率低。为提升海量视频数据的管理和利用效率,需要对视频进行帧级理解与标准化加工:

  1. 视频抽帧:通过 FFmpeg 按指定帧率抽取关键帧图片。

  2. 图片打标:调用百炼多模态大模型自动生成场景标签和语义描述。

  3. 向量化:提取 Embedding 向量,形成可用于检索、聚类、样本筛选和分析的统一特征底座。

本方案依托 MaxFrame 的分布式处理能力,实现视频数据的端到端批量处理,帮助将非结构化视频数据转化为可检索、可分析、可复用的数据资产。

适用场景#

  • 海量视频数据自动打标与管理。

  • 相似场景检索与样本召回。

  • 异常事件与长尾场景发现。

  • 数据聚类分析与分布评估。

  • 训练、评测与数据闭环支撑。

方案核心流程#

视频理解与向量化处理流程

前提条件#

#

条件

说明

1

开通 MaxCompute

需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。

2

开通 DPE 引擎

运行本示例前,请提工单为当前 MaxCompute 项目开通 DPE 引擎。

3

上传视频至 OSS

原始视频已存储在目标 OSS Bucket 中。

4

OSS RAM 角色授权

配置用于 OSS 挂载访问的 Role ARN。

5

模型计算服务与推理配额

购买或开通模型计算服务,并在运行百炼模型推理前确认已关联推理配额。

6

包含 FFmpeg 的自定义镜像

阶段一抽帧需要使用包含 FFmpeg 的 MaxCompute 自定义镜像。该镜像必须和 MaxCompute 项目处于同一地域。

7

MaxFrame SDK 版本

请使用 MaxFrame SDK 2.7.1 及以上版本(pip install maxframe>=2.7.1)。

模型计算服务与推理配额#

在 MaxCompute 控制台购买或开通模型计算服务,并在运行百炼模型推理前确认已关联推理配额。

MaxCompute 模型计算服务与推理配额页面

自定义镜像#

为什么需要自定义镜像?#

默认 DPE 运行时仅包含 Python,不包含 FFmpeg;阶段一抽帧依赖 FFmpeg 可执行文件。快速验证时,可以复用同地域已经验证过的 FFmpeg 镜像。生产环境建议自行构建并发布与 MaxCompute 项目同地域的镜像,然后将 odps.session.image 设置为该镜像名称。

Dockerfile 示例:

# syntax=docker/dockerfile:1.6
FROM --platform=linux/amd64 ubuntu:22.04

ENV TZ=Asia/Shanghai \
    LANG=en_US.UTF-8 \
    LANGUAGE=en_US:en \
    LC_ALL=en_US.UTF-8 \
    PYTHONIOENCODING=utf-8 \
    TERM=xterm-256color \
    MF_PYTHON_EXECUTABLE=/usr/bin/python3

RUN sed -i \
        -e 's|http://archive.ubuntu.com/ubuntu/|http://mirrors.aliyun.com/ubuntu/|g' \
        -e 's|http://security.ubuntu.com/ubuntu/|http://mirrors.aliyun.com/ubuntu/|g' \
        /etc/apt/sources.list \
    && apt-get update \
    && DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
        ca-certificates \
        tzdata \
        locales \
        rpm \
        sudo \
        vim \
        git \
        wget \
        curl \
        pkg-config \
        build-essential \
        gdb \
        binutils \
        telnet \
        netcat-openbsd \
        iputils-ping \
        python3 \
        python3-dev \
        python3-pip \
        python3-venv \
        ffmpeg \
        libgfortran5 \
        libgomp1 \
        libnss3 \
        libsqlite3-0 \
        libssl3 \
        zlib1g \
    && locale-gen en_US.UTF-8 \
    && rm -rf /var/lib/apt/lists/*

RUN python3 -m pip install --no-cache-dir --upgrade pip setuptools wheel \
    && python3 -m pip install --no-cache-dir \
        cloudpickle \
        numpy \
        pandas \
        requests

RUN groupadd -g 505 admin \
    && useradd -u 505 -g admin -m -d /home/admin -s /bin/bash admin \
    && mkdir -p /workspace \
    && chown -R admin:admin /home/admin /workspace \
    && update-alternatives --install /usr/bin/python python /usr/bin/python3 100 \
    && update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 100

WORKDIR /workspace

RUN getconf GNU_LIBC_VERSION \
    && strings /usr/lib/x86_64-linux-gnu/libc.so.6 | grep -q '^GLIBC_2\.34$' \
    && ffmpeg -version >/dev/null \
    && ffprobe -version >/dev/null \
    && "$MF_PYTHON_EXECUTABLE" -c "import ssl, sqlite3, numpy, pandas, requests, cloudpickle; print('runtime ok')"
CMD ["/bin/bash"]

环境准备与会话创建#

import glob
import json
import math
import os
import subprocess

import pandas as pd
import maxframe.dataframe as md
from maxframe.config import options
from maxframe.learn.contrib.llm import ImageContentType
from maxframe.learn.utils import read_odps_model
from maxframe.session import new_session
from maxframe.udf import with_fs_mount, with_running_options
from odps import ODPS

options.sql.settings = {"odps.session.image": "<your_ffmpeg_image_name>"}
options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}

o = ODPS(
    access_id="<your_access_id>",
    secret_access_key="<your_secret_access_key>",
    project="<your_maxcompute_project>",
    endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView    : {session.get_logview_address()}")

全局参数配置:

OSS_ENDPOINT = "<your_oss_endpoint>"
OSS_REGION = "<your_oss_region>"
OSS_BUCKET = "<your_oss_bucket>"
OSS_PREFIX = "<your_video_prefix>/".strip("/")
OSS_ROOT = f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}"
if OSS_PREFIX:
    OSS_ROOT = f"{OSS_ROOT}/{OSS_PREFIX}"
OSS_MOUNT_PATH = "/mnt/data"
OSS_ROLE_ARN = "acs:ram::<account_id>:role/<your_role_name>"

OSS_ACCESS_KEY_ID = "<your_oss_access_key_id>"
OSS_ACCESS_KEY_SECRET = "<your_oss_access_key_secret>"
STORAGE_OPTIONS = {
    "access_key_id": OSS_ACCESS_KEY_ID,
    "access_key_secret": OSS_ACCESS_KEY_SECRET,
}

VIDEO_INPUT_TABLE = "mf_video_input"
FRAME_OUTPUT_TABLE = "mf_video_frame_result"
LABEL_OUTPUT_TABLE = "mf_video_label_result"
EMB_OUTPUT_TABLE = "mf_video_embedding_result"

MODEL_PROJECT = "bigdata_public_modelset"

提交任务前,请确认挂载的 OSS 路径正确。例如,如果你的 OSS 地址是 oss://oss-cn-hangzhou.aliyuncs.com/jingxuan-oss-test-hz/,请设置 OSS_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"OSS_BUCKET = "jingxuan-oss-test-hz",并将 OSS_PREFIX 设为存放视频的子目录。

阶段 0:构建视频清单表#

如果已经有包含 video_path 列的 ODPS 表,可以跳过本阶段。否则,使用 PyODPS 和 alibabacloud_oss_v2 扫描 OSS 前缀,并将每个视频写入一行。阶段 1 会直接读取这张表。

video_path 的值必须与 OSS_ROOT 使用完全相同的前缀(oss://<endpoint>/<bucket>/<prefix>/...)。如果前缀不一致,with_fs_mount 的路径替换可能会静默找不到文件。

import alibabacloud_oss_v2 as oss

VIDEO_EXTS = (".mp4", ".avi", ".mov", ".mkv")


def build_video_meta(o: ODPS) -> None:
    cred = oss.credentials.StaticCredentialsProvider(
        OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET,
    )
    cfg = oss.config.load_default()
    cfg.credentials_provider = cred
    cfg.region = OSS_REGION
    cfg.endpoint = OSS_ENDPOINT
    client = oss.Client(cfg)

    rows = []
    list_prefix = f"{OSS_PREFIX}/" if OSS_PREFIX else ""
    paginator = client.list_objects_v2_paginator()
    for page in paginator.iter_page(
        oss.ListObjectsV2Request(bucket=OSS_BUCKET, prefix=list_prefix),
    ):
        for obj in page.contents or []:
            key = obj.key
            if not key.lower().endswith(VIDEO_EXTS):
                continue
            rows.append({
                "video_path": f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{key}",
                "size_bytes": int(obj.size or 0),
                "last_modified": str(obj.last_modified or ""),
            })

    print(f"Found {len(rows)} videos under {OSS_ROOT}")
    if not rows:
        raise RuntimeError("no videos found under OSS_PREFIX")

    o.delete_table(VIDEO_INPUT_TABLE, if_exists=True)
    o.create_table(
        VIDEO_INPUT_TABLE,
        schema="video_path string, size_bytes bigint, last_modified string",
        if_not_exists=True,
    )
    with o.get_table(VIDEO_INPUT_TABLE).open_writer() as w:
        w.write([
            (r["video_path"], r["size_bytes"], r["last_modified"])
            for r in rows
        ])

    print(f"Wrote {len(rows)} rows to {o.project}.{VIDEO_INPUT_TABLE}")


build_video_meta(o)

阶段 1:视频抽帧#

阶段 1 使用 mf.apply_chunk 派发视频清单表,每行代表一个视频。DPE Worker 内部通过 with_fs_mount 将 OSS 前缀挂载到本地,使用 FFprobe 校验视频时长,再用 FFmpeg 抽取 JPEG 帧,并将帧写回 <OSS_ROOT>/frames/

输出 schema 与 AI FUNC 图片输入契约对齐。下游阶段可直接使用 image_idimage_url,无需重命名列。

类型

说明

video_path

string

源视频 OSS 路径,用于 lineage 追溯。

frame_idx

bigint

帧序号。失败行使用 -1

image_id

string

<video_basename>_<frame_idx:04d>

image_url

string

帧 JPEG 的 OSS URL,可直接用于 AI FUNC。

status

string

okfailed

error_stage

string

失败阶段名称。

error_msg

string

失败原因。

@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
    path=OSS_ROOT,
    mount_path=OSS_MOUNT_PATH,
    storage_options={"role_arn": OSS_ROLE_ARN},
)
def frame_extraction_chunk(chunk: pd.DataFrame) -> pd.DataFrame[
    {"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
     "status": "object", "error_stage": "object", "error_msg": "object"}
]:
    rows = []
    frame_fps = 2
    ffmpeg_timeout_sec = 300
    for _, row in chunk.iterrows():
        video_path = row["video_path"]
        try:
            if not video_path.startswith(f"{OSS_ROOT}/"):
                raise ValueError("path_outside_root")

            local_video_path = video_path.replace(
                f"{OSS_ROOT}/", f"{OSS_MOUNT_PATH}/", 1,
            )
            local_output_dir = os.path.join(OSS_MOUNT_PATH, "frames")
            os.makedirs(local_output_dir, exist_ok=True)

            video_name = video_path.rsplit("/", 1)[1]
            video_basename = video_name.rsplit(".", 1)[0]
            local_output_pattern = os.path.join(
                local_output_dir, f"{video_basename}_%04d.jpg",
            )

            probe = subprocess.run(
                [
                    "ffprobe", "-v", "error",
                    "-show_entries", "format=duration",
                    "-of", "default=noprint_wrappers=1:nokey=1",
                    local_video_path,
                ],
                capture_output=True, text=True, check=False,
            )
            if probe.returncode != 0:
                raise RuntimeError(f"ffprobe_failed: {probe.stderr.strip()[:200]}")
            try:
                duration = float(probe.stdout.strip())
            except ValueError as exc:
                raise ValueError("invalid_duration") from exc
            if not math.isfinite(duration) or duration <= 0:
                raise ValueError("invalid_duration")

            ff = subprocess.run(
                [
                    "ffmpeg", "-y",
                    "-i", local_video_path,
                    "-vf", f"fps={frame_fps}",
                    "-q:v", "2",
                    "-start_number", "0",
                    local_output_pattern,
                ],
                capture_output=True, text=True,
                timeout=ffmpeg_timeout_sec, check=False,
            )
            if ff.returncode != 0:
                raise RuntimeError(f"ffmpeg_failed: {ff.stderr.strip()[:200]}")

            frame_files = sorted(
                glob.glob(local_output_pattern.replace("%04d", "*"))
            )
            if not frame_files:
                raise ValueError("no_frames_found")

            for frame_idx, _ in enumerate(frame_files):
                rows.append({
                    "video_path": video_path,
                    "frame_idx": frame_idx,
                    "image_id": f"{video_basename}_{frame_idx:04d}",
                    "image_url": (
                        f"{OSS_ROOT}/frames/"
                        f"{video_basename}_{frame_idx:04d}.jpg"
                    ),
                    "status": "ok",
                    "error_stage": "",
                    "error_msg": "",
                })
        except Exception as exc:
            rows.append({
                "video_path": video_path,
                "frame_idx": -1,
                "image_id": "",
                "image_url": "",
                "status": "failed",
                "error_stage": "frame_extraction",
                "error_msg": str(exc)[:512],
            })
    return pd.DataFrame(rows)


video_df = md.read_odps_table(VIDEO_INPUT_TABLE)
frame_df = video_df.mf.apply_chunk(frame_extraction_chunk)

md.to_odps_table(frame_df, FRAME_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(FRAME_OUTPUT_TABLE).to_df().head(10))

阶段 2:图片打标#

通过 AI FUNC 加载多模态模型 qwen3.6-plus,为每张抽帧成功的图片生成结构化描述。

label_prompt = (
    "这是一张自动驾驶车辆摄像头采集的道路场景图片。请对画面内容进行结构化描述,包括以下方面:"
    "1)道路环境:道路类型(高速/城区/乡村)、车道数、路面状况、交通标志标线;"
    "2)天气与光照:天气状况(晴/阴/雨/雪/雾)、光照条件(白天/夜间/逆光/隧道);"
    "3)交通参与者:车辆类型与数量、行人、非机动车、特殊车辆(工程车/公交等);"
    "4)关键事件:是否存在异常行为(急刹、变道、闯红灯)、潜在风险、长尾场景。"
    "请用中文输出 JSON 格式,字段包括:scene_type, weather, lighting, road_type, "
    "key_objects, risk_level, description。"
)

vlm_model = read_odps_model("qwen3.6-plus", project=MODEL_PROJECT)
cp = vlm_model.content_part

frame_ok = frame_df[frame_df["status"] == "ok"][
    ["video_path", "frame_idx", "image_id", "image_url"]
]

label_df = vlm_model.generate(
    frame_ok,
    messages=[
        {
            "role": "user",
            "content": [
                cp.text(label_prompt),
                cp.image(
                    data=frame_ok["image_url"],
                    type=ImageContentType.IMAGE_URL,
                    storage_options=STORAGE_OPTIONS,
                ),
            ]
        }
    ],
    simple_output=False,
    params={"max_tokens": 1024},
    running_options={"enable_real_rpm_stats": True},
)

label_combined = md.concat(
    [
        frame_ok,
        label_df.rename(columns={
            "response": "label_response",
            "success": "label_success",
        }),
    ],
    axis=1,
)


def parse_label(chunk: pd.DataFrame) -> pd.DataFrame[
    {"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
     "label_text": "object", "label_input_token": "int64", "label_output_token": "int64",
     "status": "object", "error_stage": "object", "error_msg": "object"}
]:
    rows = []
    for _, r in chunk.iterrows():
        base = {
            "video_path": r["video_path"],
            "frame_idx": int(r["frame_idx"]),
            "image_id": r["image_id"],
            "image_url": r["image_url"],
            "label_text": None,
            "label_input_token": 0,
            "label_output_token": 0,
            "status": "ok",
            "error_stage": "",
            "error_msg": "",
        }
        if not r["label_success"]:
            base.update(
                status="failed",
                error_stage="label",
                error_msg=str(r["label_response"])[:512],
            )
            rows.append(base)
            continue
        try:
            resp = (
                json.loads(r["label_response"])
                if isinstance(r["label_response"], str)
                else r["label_response"]
            )
            base["label_text"] = resp["choices"][0]["message"]["content"]
            usage = resp.get("usage", {}) or {}
            base["label_input_token"] = int(usage.get("prompt_tokens", 0) or 0)
            base["label_output_token"] = int(
                usage.get("completion_tokens", 0) or 0
            )
        except Exception as exc:
            base.update(
                status="failed",
                error_stage="label",
                error_msg=f"label_parse: {exc}",
            )
        rows.append(base)
    return pd.DataFrame(rows)


label_result = label_combined.mf.apply_chunk(parse_label)

md.to_odps_table(label_result, LABEL_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(LABEL_OUTPUT_TABLE).to_df().head(5))

阶段 3:图像向量化#

加载多模态向量化模型(默认 qwen3-vl-embedding),为打标成功的图片生成图像向量。

embedding_dim = 1024
embedding_model = read_odps_model("qwen3-vl-embedding", project=MODEL_PROJECT)
emb_cp = embedding_model.content_part

emb_input = label_result[label_result["status"] == "ok"][
    ["video_path", "frame_idx", "image_id", "image_url", "label_text"]
]

image_emb_df = embedding_model.embed(
    emb_input,
    input=[
        emb_cp.image(
            data=emb_input["image_url"],
            type=ImageContentType.IMAGE_URL,
            storage_options=STORAGE_OPTIONS,
        ),
    ],
    simple_output=False,
    params={"timeout": 120, "dimension": embedding_dim},
    running_options={"enable_real_rpm_stats": True},
)

emb_combined = md.concat(
    [
        emb_input,
        image_emb_df.rename(columns={
            "response": "image_embedding_response",
            "success": "image_embedding_success",
        }),
    ],
    axis=1,
)


def parse_embedding(chunk: pd.DataFrame) -> pd.DataFrame[
    {"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
     "label_text": "object", "image_embedding": "object",
     "image_embedding_total_token": "int64",
     "status": "object", "error_stage": "object", "error_msg": "object"}
]:
    rows = []
    for _, r in chunk.iterrows():
        base = {
            "video_path": r["video_path"],
            "frame_idx": int(r["frame_idx"]),
            "image_id": r["image_id"],
            "image_url": r["image_url"],
            "label_text": r["label_text"],
            "image_embedding": None,
            "image_embedding_total_token": 0,
            "status": "ok",
            "error_stage": "",
            "error_msg": "",
        }
        if not r["image_embedding_success"]:
            base.update(
                status="failed",
                error_stage="image_embedding",
                error_msg=str(r["image_embedding_response"])[:512],
            )
            rows.append(base)
            continue
        try:
            resp = (
                json.loads(r["image_embedding_response"])
                if isinstance(r["image_embedding_response"], str)
                else r["image_embedding_response"]
            )
            embedding = resp["output"]["embeddings"][0]["embedding"]
            base["image_embedding"] = json.dumps(
                embedding, ensure_ascii=False, separators=(",", ":"),
            )
            usage = resp.get("usage", {}) or {}
            base["image_embedding_total_token"] = int(
                usage.get("total_tokens", 0) or 0
            )
        except Exception as exc:
            base.update(
                status="failed",
                error_stage="image_embedding",
                error_msg=f"emb_parse: {exc}",
            )
        rows.append(base)
    return pd.DataFrame(rows)

emb_result = emb_combined.mf.apply_chunk(parse_embedding)

md.to_odps_table(emb_result, EMB_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(EMB_OUTPUT_TABLE).to_df().head(5))

资源清理#

print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")