视频处理全流程:抽帧 -> 打标 -> 向量化#
背景信息#
自动驾驶车辆在研发和运营过程中会持续采集海量摄像头视频数据。这些数据覆盖不同道路类型、交通流、天气光照和复杂事件,是模型训练、场景挖掘、数据闭环和效果评测的重要基础。
然而,原始视频通常缺乏结构化标签,人工筛选和整理成本高,导致高价值样本发现难、数据复用效率低。为提升海量视频数据的管理和利用效率,需要对视频进行帧级理解与标准化加工:
视频抽帧:通过 FFmpeg 按指定帧率抽取关键帧图片。
图片打标:调用百炼多模态大模型自动生成场景标签和语义描述。
向量化:提取 Embedding 向量,形成可用于检索、聚类、样本筛选和分析的统一特征底座。
本方案依托 MaxFrame 的分布式处理能力,实现视频数据的端到端批量处理,帮助将非结构化视频数据转化为可检索、可分析、可复用的数据资产。
适用场景#
海量视频数据自动打标与管理。
相似场景检索与样本召回。
异常事件与长尾场景发现。
数据聚类分析与分布评估。
训练、评测与数据闭环支撑。
方案核心流程#
前提条件#
# |
条件 |
说明 |
|---|---|---|
1 |
开通 MaxCompute |
需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。 |
2 |
开通 DPE 引擎 |
运行本示例前,请提工单为当前 MaxCompute 项目开通 DPE 引擎。 |
3 |
上传视频至 OSS |
原始视频已存储在目标 OSS Bucket 中。 |
4 |
OSS RAM 角色授权 |
配置用于 OSS 挂载访问的 Role ARN。 |
5 |
模型计算服务与推理配额 |
购买或开通模型计算服务,并在运行百炼模型推理前确认已关联推理配额。 |
6 |
包含 FFmpeg 的自定义镜像 |
阶段一抽帧需要使用包含 FFmpeg 的 MaxCompute 自定义镜像。该镜像必须和 MaxCompute 项目处于同一地域。 |
7 |
MaxFrame SDK 版本 |
请使用 MaxFrame SDK 2.7.1 及以上版本( |
模型计算服务与推理配额#
在 MaxCompute 控制台购买或开通模型计算服务,并在运行百炼模型推理前确认已关联推理配额。
自定义镜像#
为什么需要自定义镜像?#
默认 DPE 运行时仅包含 Python,不包含 FFmpeg;阶段一抽帧依赖 FFmpeg 可执行文件。快速验证时,可以复用同地域已经验证过的 FFmpeg 镜像。生产环境建议自行构建并发布与 MaxCompute 项目同地域的镜像,然后将 odps.session.image 设置为该镜像名称。
Dockerfile 示例:
# syntax=docker/dockerfile:1.6
FROM --platform=linux/amd64 ubuntu:22.04
ENV TZ=Asia/Shanghai \
LANG=en_US.UTF-8 \
LANGUAGE=en_US:en \
LC_ALL=en_US.UTF-8 \
PYTHONIOENCODING=utf-8 \
TERM=xterm-256color \
MF_PYTHON_EXECUTABLE=/usr/bin/python3
RUN sed -i \
-e 's|http://archive.ubuntu.com/ubuntu/|http://mirrors.aliyun.com/ubuntu/|g' \
-e 's|http://security.ubuntu.com/ubuntu/|http://mirrors.aliyun.com/ubuntu/|g' \
/etc/apt/sources.list \
&& apt-get update \
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \
ca-certificates \
tzdata \
locales \
rpm \
sudo \
vim \
git \
wget \
curl \
pkg-config \
build-essential \
gdb \
binutils \
telnet \
netcat-openbsd \
iputils-ping \
python3 \
python3-dev \
python3-pip \
python3-venv \
ffmpeg \
libgfortran5 \
libgomp1 \
libnss3 \
libsqlite3-0 \
libssl3 \
zlib1g \
&& locale-gen en_US.UTF-8 \
&& rm -rf /var/lib/apt/lists/*
RUN python3 -m pip install --no-cache-dir --upgrade pip setuptools wheel \
&& python3 -m pip install --no-cache-dir \
cloudpickle \
numpy \
pandas \
requests
RUN groupadd -g 505 admin \
&& useradd -u 505 -g admin -m -d /home/admin -s /bin/bash admin \
&& mkdir -p /workspace \
&& chown -R admin:admin /home/admin /workspace \
&& update-alternatives --install /usr/bin/python python /usr/bin/python3 100 \
&& update-alternatives --install /usr/bin/pip pip /usr/bin/pip3 100
WORKDIR /workspace
RUN getconf GNU_LIBC_VERSION \
&& strings /usr/lib/x86_64-linux-gnu/libc.so.6 | grep -q '^GLIBC_2\.34$' \
&& ffmpeg -version >/dev/null \
&& ffprobe -version >/dev/null \
&& "$MF_PYTHON_EXECUTABLE" -c "import ssl, sqlite3, numpy, pandas, requests, cloudpickle; print('runtime ok')"
CMD ["/bin/bash"]
环境准备与会话创建#
import glob
import json
import math
import os
import subprocess
import pandas as pd
import maxframe.dataframe as md
from maxframe.config import options
from maxframe.learn.contrib.llm import ImageContentType
from maxframe.learn.utils import read_odps_model
from maxframe.session import new_session
from maxframe.udf import with_fs_mount, with_running_options
from odps import ODPS
options.sql.settings = {"odps.session.image": "<your_ffmpeg_image_name>"}
options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}
o = ODPS(
access_id="<your_access_id>",
secret_access_key="<your_secret_access_key>",
project="<your_maxcompute_project>",
endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView : {session.get_logview_address()}")
全局参数配置:
OSS_ENDPOINT = "<your_oss_endpoint>"
OSS_REGION = "<your_oss_region>"
OSS_BUCKET = "<your_oss_bucket>"
OSS_PREFIX = "<your_video_prefix>/".strip("/")
OSS_ROOT = f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}"
if OSS_PREFIX:
OSS_ROOT = f"{OSS_ROOT}/{OSS_PREFIX}"
OSS_MOUNT_PATH = "/mnt/data"
OSS_ROLE_ARN = "acs:ram::<account_id>:role/<your_role_name>"
OSS_ACCESS_KEY_ID = "<your_oss_access_key_id>"
OSS_ACCESS_KEY_SECRET = "<your_oss_access_key_secret>"
STORAGE_OPTIONS = {
"access_key_id": OSS_ACCESS_KEY_ID,
"access_key_secret": OSS_ACCESS_KEY_SECRET,
}
VIDEO_INPUT_TABLE = "mf_video_input"
FRAME_OUTPUT_TABLE = "mf_video_frame_result"
LABEL_OUTPUT_TABLE = "mf_video_label_result"
EMB_OUTPUT_TABLE = "mf_video_embedding_result"
MODEL_PROJECT = "bigdata_public_modelset"
提交任务前,请确认挂载的 OSS 路径正确。例如,如果你的 OSS 地址是 oss://oss-cn-hangzhou.aliyuncs.com/jingxuan-oss-test-hz/,请设置 OSS_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"、OSS_BUCKET = "jingxuan-oss-test-hz",并将 OSS_PREFIX 设为存放视频的子目录。
阶段 0:构建视频清单表#
如果已经有包含 video_path 列的 ODPS 表,可以跳过本阶段。否则,使用 PyODPS 和 alibabacloud_oss_v2 扫描 OSS 前缀,并将每个视频写入一行。阶段 1 会直接读取这张表。
video_path 的值必须与 OSS_ROOT 使用完全相同的前缀(oss://<endpoint>/<bucket>/<prefix>/...)。如果前缀不一致,with_fs_mount 的路径替换可能会静默找不到文件。
import alibabacloud_oss_v2 as oss
VIDEO_EXTS = (".mp4", ".avi", ".mov", ".mkv")
def build_video_meta(o: ODPS) -> None:
cred = oss.credentials.StaticCredentialsProvider(
OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET,
)
cfg = oss.config.load_default()
cfg.credentials_provider = cred
cfg.region = OSS_REGION
cfg.endpoint = OSS_ENDPOINT
client = oss.Client(cfg)
rows = []
list_prefix = f"{OSS_PREFIX}/" if OSS_PREFIX else ""
paginator = client.list_objects_v2_paginator()
for page in paginator.iter_page(
oss.ListObjectsV2Request(bucket=OSS_BUCKET, prefix=list_prefix),
):
for obj in page.contents or []:
key = obj.key
if not key.lower().endswith(VIDEO_EXTS):
continue
rows.append({
"video_path": f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{key}",
"size_bytes": int(obj.size or 0),
"last_modified": str(obj.last_modified or ""),
})
print(f"Found {len(rows)} videos under {OSS_ROOT}")
if not rows:
raise RuntimeError("no videos found under OSS_PREFIX")
o.delete_table(VIDEO_INPUT_TABLE, if_exists=True)
o.create_table(
VIDEO_INPUT_TABLE,
schema="video_path string, size_bytes bigint, last_modified string",
if_not_exists=True,
)
with o.get_table(VIDEO_INPUT_TABLE).open_writer() as w:
w.write([
(r["video_path"], r["size_bytes"], r["last_modified"])
for r in rows
])
print(f"Wrote {len(rows)} rows to {o.project}.{VIDEO_INPUT_TABLE}")
build_video_meta(o)
阶段 1:视频抽帧#
阶段 1 使用 mf.apply_chunk 派发视频清单表,每行代表一个视频。DPE Worker 内部通过 with_fs_mount 将 OSS 前缀挂载到本地,使用 FFprobe 校验视频时长,再用 FFmpeg 抽取 JPEG 帧,并将帧写回 <OSS_ROOT>/frames/。
输出 schema 与 AI FUNC 图片输入契约对齐。下游阶段可直接使用 image_id 和 image_url,无需重命名列。
列 |
类型 |
说明 |
|---|---|---|
|
string |
源视频 OSS 路径,用于 lineage 追溯。 |
|
bigint |
帧序号。失败行使用 |
|
string |
|
|
string |
帧 JPEG 的 OSS URL,可直接用于 AI FUNC。 |
|
string |
|
|
string |
失败阶段名称。 |
|
string |
失败原因。 |
@with_running_options(engine="dpe", cpu=2, memory=8)
@with_fs_mount(
path=OSS_ROOT,
mount_path=OSS_MOUNT_PATH,
storage_options={"role_arn": OSS_ROLE_ARN},
)
def frame_extraction_chunk(chunk: pd.DataFrame) -> pd.DataFrame[
{"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
"status": "object", "error_stage": "object", "error_msg": "object"}
]:
rows = []
frame_fps = 2
ffmpeg_timeout_sec = 300
for _, row in chunk.iterrows():
video_path = row["video_path"]
try:
if not video_path.startswith(f"{OSS_ROOT}/"):
raise ValueError("path_outside_root")
local_video_path = video_path.replace(
f"{OSS_ROOT}/", f"{OSS_MOUNT_PATH}/", 1,
)
local_output_dir = os.path.join(OSS_MOUNT_PATH, "frames")
os.makedirs(local_output_dir, exist_ok=True)
video_name = video_path.rsplit("/", 1)[1]
video_basename = video_name.rsplit(".", 1)[0]
local_output_pattern = os.path.join(
local_output_dir, f"{video_basename}_%04d.jpg",
)
probe = subprocess.run(
[
"ffprobe", "-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
local_video_path,
],
capture_output=True, text=True, check=False,
)
if probe.returncode != 0:
raise RuntimeError(f"ffprobe_failed: {probe.stderr.strip()[:200]}")
try:
duration = float(probe.stdout.strip())
except ValueError as exc:
raise ValueError("invalid_duration") from exc
if not math.isfinite(duration) or duration <= 0:
raise ValueError("invalid_duration")
ff = subprocess.run(
[
"ffmpeg", "-y",
"-i", local_video_path,
"-vf", f"fps={frame_fps}",
"-q:v", "2",
"-start_number", "0",
local_output_pattern,
],
capture_output=True, text=True,
timeout=ffmpeg_timeout_sec, check=False,
)
if ff.returncode != 0:
raise RuntimeError(f"ffmpeg_failed: {ff.stderr.strip()[:200]}")
frame_files = sorted(
glob.glob(local_output_pattern.replace("%04d", "*"))
)
if not frame_files:
raise ValueError("no_frames_found")
for frame_idx, _ in enumerate(frame_files):
rows.append({
"video_path": video_path,
"frame_idx": frame_idx,
"image_id": f"{video_basename}_{frame_idx:04d}",
"image_url": (
f"{OSS_ROOT}/frames/"
f"{video_basename}_{frame_idx:04d}.jpg"
),
"status": "ok",
"error_stage": "",
"error_msg": "",
})
except Exception as exc:
rows.append({
"video_path": video_path,
"frame_idx": -1,
"image_id": "",
"image_url": "",
"status": "failed",
"error_stage": "frame_extraction",
"error_msg": str(exc)[:512],
})
return pd.DataFrame(rows)
video_df = md.read_odps_table(VIDEO_INPUT_TABLE)
frame_df = video_df.mf.apply_chunk(frame_extraction_chunk)
md.to_odps_table(frame_df, FRAME_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(FRAME_OUTPUT_TABLE).to_df().head(10))
阶段 2:图片打标#
通过 AI FUNC 加载多模态模型 qwen3.6-plus,为每张抽帧成功的图片生成结构化描述。
label_prompt = (
"这是一张自动驾驶车辆摄像头采集的道路场景图片。请对画面内容进行结构化描述,包括以下方面:"
"1)道路环境:道路类型(高速/城区/乡村)、车道数、路面状况、交通标志标线;"
"2)天气与光照:天气状况(晴/阴/雨/雪/雾)、光照条件(白天/夜间/逆光/隧道);"
"3)交通参与者:车辆类型与数量、行人、非机动车、特殊车辆(工程车/公交等);"
"4)关键事件:是否存在异常行为(急刹、变道、闯红灯)、潜在风险、长尾场景。"
"请用中文输出 JSON 格式,字段包括:scene_type, weather, lighting, road_type, "
"key_objects, risk_level, description。"
)
vlm_model = read_odps_model("qwen3.6-plus", project=MODEL_PROJECT)
cp = vlm_model.content_part
frame_ok = frame_df[frame_df["status"] == "ok"][
["video_path", "frame_idx", "image_id", "image_url"]
]
label_df = vlm_model.generate(
frame_ok,
messages=[
{
"role": "user",
"content": [
cp.text(label_prompt),
cp.image(
data=frame_ok["image_url"],
type=ImageContentType.IMAGE_URL,
storage_options=STORAGE_OPTIONS,
),
]
}
],
simple_output=False,
params={"max_tokens": 1024},
running_options={"enable_real_rpm_stats": True},
)
label_combined = md.concat(
[
frame_ok,
label_df.rename(columns={
"response": "label_response",
"success": "label_success",
}),
],
axis=1,
)
def parse_label(chunk: pd.DataFrame) -> pd.DataFrame[
{"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
"label_text": "object", "label_input_token": "int64", "label_output_token": "int64",
"status": "object", "error_stage": "object", "error_msg": "object"}
]:
rows = []
for _, r in chunk.iterrows():
base = {
"video_path": r["video_path"],
"frame_idx": int(r["frame_idx"]),
"image_id": r["image_id"],
"image_url": r["image_url"],
"label_text": None,
"label_input_token": 0,
"label_output_token": 0,
"status": "ok",
"error_stage": "",
"error_msg": "",
}
if not r["label_success"]:
base.update(
status="failed",
error_stage="label",
error_msg=str(r["label_response"])[:512],
)
rows.append(base)
continue
try:
resp = (
json.loads(r["label_response"])
if isinstance(r["label_response"], str)
else r["label_response"]
)
base["label_text"] = resp["choices"][0]["message"]["content"]
usage = resp.get("usage", {}) or {}
base["label_input_token"] = int(usage.get("prompt_tokens", 0) or 0)
base["label_output_token"] = int(
usage.get("completion_tokens", 0) or 0
)
except Exception as exc:
base.update(
status="failed",
error_stage="label",
error_msg=f"label_parse: {exc}",
)
rows.append(base)
return pd.DataFrame(rows)
label_result = label_combined.mf.apply_chunk(parse_label)
md.to_odps_table(label_result, LABEL_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(LABEL_OUTPUT_TABLE).to_df().head(5))
阶段 3:图像向量化#
加载多模态向量化模型(默认 qwen3-vl-embedding),为打标成功的图片生成图像向量。
embedding_dim = 1024
embedding_model = read_odps_model("qwen3-vl-embedding", project=MODEL_PROJECT)
emb_cp = embedding_model.content_part
emb_input = label_result[label_result["status"] == "ok"][
["video_path", "frame_idx", "image_id", "image_url", "label_text"]
]
image_emb_df = embedding_model.embed(
emb_input,
input=[
emb_cp.image(
data=emb_input["image_url"],
type=ImageContentType.IMAGE_URL,
storage_options=STORAGE_OPTIONS,
),
],
simple_output=False,
params={"timeout": 120, "dimension": embedding_dim},
running_options={"enable_real_rpm_stats": True},
)
emb_combined = md.concat(
[
emb_input,
image_emb_df.rename(columns={
"response": "image_embedding_response",
"success": "image_embedding_success",
}),
],
axis=1,
)
def parse_embedding(chunk: pd.DataFrame) -> pd.DataFrame[
{"video_path": "object", "frame_idx": "int64", "image_id": "object", "image_url": "object",
"label_text": "object", "image_embedding": "object",
"image_embedding_total_token": "int64",
"status": "object", "error_stage": "object", "error_msg": "object"}
]:
rows = []
for _, r in chunk.iterrows():
base = {
"video_path": r["video_path"],
"frame_idx": int(r["frame_idx"]),
"image_id": r["image_id"],
"image_url": r["image_url"],
"label_text": r["label_text"],
"image_embedding": None,
"image_embedding_total_token": 0,
"status": "ok",
"error_stage": "",
"error_msg": "",
}
if not r["image_embedding_success"]:
base.update(
status="failed",
error_stage="image_embedding",
error_msg=str(r["image_embedding_response"])[:512],
)
rows.append(base)
continue
try:
resp = (
json.loads(r["image_embedding_response"])
if isinstance(r["image_embedding_response"], str)
else r["image_embedding_response"]
)
embedding = resp["output"]["embeddings"][0]["embedding"]
base["image_embedding"] = json.dumps(
embedding, ensure_ascii=False, separators=(",", ":"),
)
usage = resp.get("usage", {}) or {}
base["image_embedding_total_token"] = int(
usage.get("total_tokens", 0) or 0
)
except Exception as exc:
base.update(
status="failed",
error_stage="image_embedding",
error_msg=f"emb_parse: {exc}",
)
rows.append(base)
return pd.DataFrame(rows)
emb_result = emb_combined.mf.apply_chunk(parse_embedding)
md.to_odps_table(emb_result, EMB_OUTPUT_TABLE, overwrite=True).execute()
print(o.get_table(EMB_OUTPUT_TABLE).to_df().head(5))
资源清理#
print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")