视频处理全流程:抽帧 -> 打标 -> 向量化#
背景信息#
自动驾驶车辆在研发和运营过程中会持续采集海量摄像头视频数据。这些数据覆盖不同道路类型、交通流、天气光照和复杂事件,是模型训练、场景挖掘、数据闭环和效果评测的重要基础。
然而,原始视频通常缺乏结构化标签,人工筛选和整理成本高,导致高价值样本发现难、数据复用效率低。为提升海量视频数据的管理和利用效率,需要对视频进行帧级理解与标准化加工:
视频抽帧:通过 FFmpeg 按指定帧率抽取关键帧图片。
图片打标:调用百炼多模态大模型自动生成场景标签和语义描述。
向量化:提取 Embedding 向量,形成可用于检索、聚类、样本筛选和分析的统一特征底座。
本方案依托 MaxFrame 的分布式处理能力,实现视频数据的端到端批量处理,帮助将非结构化视频数据转化为可检索、可分析、可复用的数据资产。
适用场景#
海量视频数据自动打标与管理。
相似场景检索与样本召回。
异常事件与长尾场景发现。
数据聚类分析与分布评估。
训练、评测与数据闭环支撑。
方案核心流程#
前提条件#
# |
条件 |
说明 |
|---|---|---|
1 |
开通 MaxCompute |
需要一个已开通的 MaxCompute 项目,并具备有效的 Access ID / Access Key。 |
2 |
开通 DPE 引擎 |
|
3 |
上传视频至 OSS |
原始视频已存储在目标 OSS Bucket 中。 |
4 |
OSS RAM 角色授权 |
配置用于 OSS 挂载访问的 Role ARN。 |
5 |
百炼 API Key |
用于多模态打标与向量化模型调用。 |
6 |
包含 FFmpeg 的自定义镜像 |
阶段一抽帧需要使用包含 FFmpeg 的 MaxCompute 自定义镜像。该镜像必须和 MaxCompute 项目处于同一地域。 |
7 |
MaxFrame SDK 版本 |
请使用 MaxFrame SDK 2.6.0 及以上版本( |
自定义镜像#
为什么需要自定义镜像?#
默认 DPE 运行时仅包含 Python,不包含 FFmpeg;阶段一抽帧依赖 FFmpeg 可执行文件。快速验证时,可以复用同地域已经验证过的 FFmpeg 镜像。生产环境建议自行构建并发布与 MaxCompute 项目同地域的镜像,然后将 odps.session.image 设置为该镜像名称。
Dockerfile 示例:
FROM registry.cn-zhangjiakou.aliyuncs.com/maxcompute_image/ubuntu_20.04:latest
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
COPY ossfs2_2.0.3.1_linux_x86_64.deb /tmp/
RUN apt install /tmp/ossfs2_2.0.3.1_linux_x86_64.deb
WORKDIR /workspace
ENV MF_PYTHON_EXECUTABLE="/usr/ali/python3.11.7/bin/python3"
环境准备与会话创建#
import glob
import json
import math
import os
import subprocess
import numpy as np
import pandas as pd
import maxframe.dataframe as md
from maxframe.config import options
from maxframe.session import new_session
from maxframe.udf import with_fs_mount, with_python_requirements, with_running_options
from maxframe.dataframe.utils import parse_index
from odps import ODPS
options.sql.settings = {"odps.session.image": "<your_ffmpeg_image_name>"}
options.dag.settings = {"engine_order": ["DPE", "MCSQL"]}
options.dpe.settings = {
"substep.internal_network_whitelist": [
"intranet-cn-beijing.dashscope.aliyuncs.com:443",
],
}
o = ODPS(
access_id="<your_access_id>",
secret_access_key="<your_secret_access_key>",
project="<your_maxcompute_project>",
endpoint="https://service.<region>.maxcompute.aliyun.com/api",
)
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView : {session.get_logview_address()}")
全局参数配置:
OSS_ENDPOINT = "<your_oss_endpoint>" # for example: oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET = "<your_oss_bucket>"
OSS_PATH = "<your_video_prefix>/"
ROLE_ARN = "acs:ram::<account_id>:role/<your_role_name>"
DASHSCOPE_API_KEY = "<your_dashscope_api_key>"
LABEL_MODEL = "qwen3-vl-plus"
EMBEDDING_MODEL = "qwen3-vl-embedding"
EMBEDDING_DIM = 1024
LABEL_PROMPT = (
"This image is captured from an autonomous driving scenario. "
"Identify factors that may affect driving, including lane quality, signs, "
"road conditions, obstacles, poor weather, poor lighting, complex traffic "
"patterns, and unusual events. Provide detailed scene factors only."
)
提交任务前,请确认挂载的 OSS path 正确。例如,如果你的 OSS 地址是 oss://oss-cn-hangzhou.aliyuncs.com/jingxuan-oss-test-hz/,则设置 OSS_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com"、OSS_BUCKET = "jingxuan-oss-test-hz",并将 OSS_PATH 设为 "" 或实际存放视频的子目录。
阶段一:视频抽帧#
@with_running_options(engine="dpe", cpu=1, memory=4)
@with_fs_mount(
path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
mount_path="/mnt/data",
storage_options={"role_arn": ROLE_ARN},
)
def extract_frame(batch_df, fps=2, quality=2, timeout=300):
video_extensions = (".mp4", ".avi", ".mov")
frame_columns = ["image_oss_bucket", "image_oss_path", "image_oss_name", "size"]
video_files = []
for root, _, files in os.walk("/mnt/data"):
for f in files:
if f.lower().endswith(video_extensions):
video_files.append(os.path.join(root, f))
all_results = []
for video_path in video_files:
video_name_no_ext = os.path.splitext(os.path.basename(video_path))[0]
output_dir = os.path.join("/mnt/data", "output_frames", video_name_no_ext)
os.makedirs(output_dir, exist_ok=True)
try:
cmd = [
"ffmpeg", "-i", video_path,
"-vf", f"fps={fps}",
"-q:v", str(quality),
"-f", "image2",
os.path.join(output_dir, "%04d.jpg"),
]
process = subprocess.run(
cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, timeout=timeout
)
if process.returncode != 0:
continue
except Exception:
continue
for frame in sorted(glob.glob(os.path.join(output_dir, "*.jpg"))):
rel_dir = output_dir.replace("/mnt/data/", "").rstrip("/") + "/"
all_results.append([
OSS_BUCKET,
rel_dir,
os.path.basename(frame),
os.path.getsize(frame),
])
if not all_results:
return pd.DataFrame(columns=frame_columns)
return pd.DataFrame(all_results, columns=frame_columns)
seed_df = md.DataFrame(pd.DataFrame({"trigger": [1]}))
frame_result = seed_df.mf.apply_chunk(
extract_frame,
output_type="dataframe",
dtypes={
"image_oss_bucket": np.dtype("str"),
"image_oss_path": np.dtype("str"),
"image_oss_name": np.dtype("str"),
"size": np.dtype("int"),
},
skip_infer=True,
index=parse_index(pd.Index([], dtype=np.int64)),
)
md.to_odps_table(frame_result, "mf_video_frame_result", overwrite=True, index=False).execute()
阶段二:图片打标#
备注
在大规模生产场景中,建议申请更高 API 并发配额以避免 429/403 限流。
@with_fs_mount(
path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
mount_path="/mnt/data",
storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("dashscope>=1.24.6")
@with_running_options(engine="dpe", cpu=1, memory=4)
def labeling_chunk(chunk, api_key=None, model=LABEL_MODEL, prompt=None, max_tokens=250):
import base64
from http import HTTPStatus
import dashscope
dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"
_api_key = api_key or DASHSCOPE_API_KEY
_prompt = prompt or LABEL_PROMPT
rows = []
for _, row in chunk.iterrows():
try:
img_path = f"/mnt/data/{row['image_oss_path']}{row['image_oss_name']}"
suffix = os.path.splitext(img_path)[1].lower()
mime = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png"}.get(suffix, "image/jpeg")
with open(img_path, "rb") as f:
image_content = f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"
resp = dashscope.MultiModalConversation.call(
api_key=_api_key,
model=model,
messages=[{"role": "user", "content": [{"image": image_content}, {"text": _prompt}]}],
enable_thinking=False,
max_tokens=max_tokens,
)
if resp.status_code != HTTPStatus.OK:
raise RuntimeError(f"[{resp.status_code}] {resp.message}")
label = resp.output["choices"][0]["message"]["content"][0]["text"].strip()
rows.append(
{
"image_oss_path": row["image_oss_path"],
"image_oss_name": row["image_oss_name"],
"label": label,
"status": "succeed",
"error_stage": "",
"error_msg": "",
}
)
except Exception as e:
rows.append(
{
"image_oss_path": row["image_oss_path"],
"image_oss_name": row["image_oss_name"],
"label": "",
"status": "failed",
"error_stage": "label",
"error_msg": str(e),
}
)
return pd.DataFrame(rows)
label_result = frame_result[["image_oss_path", "image_oss_name", "size"]].mf.apply_chunk(
labeling_chunk,
output_type="dataframe",
dtypes={
"image_oss_path": "object",
"image_oss_name": "object",
"label": "object",
"status": "object",
"error_stage": "object",
"error_msg": "object",
},
)
md.to_odps_table(label_result, "mf_video_label_result", overwrite=True).execute()
阶段三:向量化#
@with_fs_mount(
path=f"oss://{OSS_ENDPOINT}/{OSS_BUCKET}/{OSS_PATH}",
mount_path="/mnt/data",
storage_options={"role_arn": ROLE_ARN},
)
@with_python_requirements("dashscope>=1.24.6")
@with_running_options(engine="dpe", cpu=1, memory=4)
def embedding_chunk(chunk, api_key=None, model=EMBEDDING_MODEL, dimension=EMBEDDING_DIM):
import base64
from http import HTTPStatus
import dashscope
dashscope.base_http_api_url = "https://intranet-cn-beijing.dashscope.aliyuncs.com/api/v1"
_api_key = api_key or DASHSCOPE_API_KEY
def _embed(input_data):
resp = dashscope.MultiModalEmbedding.call(
api_key=_api_key, model=model, input=[input_data], dimension=dimension
)
if resp.status_code != HTTPStatus.OK:
raise RuntimeError(f"[{resp.status_code}] {resp.message}")
return resp.output["embeddings"][0]["embedding"]
def _validate(vector):
vals = [float(v) for v in vector]
if len(vals) != dimension:
raise ValueError(f"dimension mismatch: expected {dimension}, got {len(vals)}")
return vals
rows = []
for _, row in chunk.iterrows():
try:
img_path = f"/mnt/data/{row['image_oss_path']}{row['image_oss_name']}"
suffix = os.path.splitext(img_path)[1].lower()
mime = {".jpg": "image/jpeg", ".jpeg": "image/jpeg", ".png": "image/png"}.get(suffix, "image/jpeg")
with open(img_path, "rb") as f:
image_content = f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"
label_emb = _validate(_embed({"text": row["label"]}))
image_emb = _validate(_embed({"image": image_content}))
rows.append(
{
"image_oss_path": row["image_oss_path"],
"image_oss_name": row["image_oss_name"],
"label_embedding": json.dumps(label_emb, separators=(",", ":")),
"image_embedding": json.dumps(image_emb, separators=(",", ":")),
"status": "succeed",
"error_stage": "",
"error_msg": "",
}
)
except Exception as e:
rows.append(
{
"image_oss_path": row["image_oss_path"],
"image_oss_name": row["image_oss_name"],
"label_embedding": "",
"image_embedding": "",
"status": "failed",
"error_stage": "embedding",
"error_msg": str(e),
}
)
return pd.DataFrame(rows)
emb_df = label_result[label_result["status"] == "succeed"][
["image_oss_path", "image_oss_name", "label"]
]
emb_result = emb_df.mf.apply_chunk(
embedding_chunk,
output_type="dataframe",
dtypes={
"image_oss_path": "object",
"image_oss_name": "object",
"label_embedding": "object",
"image_embedding": "object",
"status": "object",
"error_stage": "object",
"error_msg": "object",
},
)
md.to_odps_table(emb_result, "mf_video_embedding_result", overwrite=True).execute()
资源清理#
print(f"LogView: {session.get_logview_address()}")
session.destroy()
print("Session destroyed.")