PDF 文本解析和百炼 Embedding#
背景信息#
论文、合同、研究报告、产品手册、白皮书等 PDF 是企业和研究机构常见的文档载体,承载着大量非结构化业务知识。原始 PDF 版式多样、长度不一、缺少语义切分,难以直接用于语义检索、RAG 问答、文档分类等下游场景。
本最佳实践演示如何基于 MaxFrame 和 MaxCompute DPE 引擎运行分布式 PDF 预处理流水线:抽取 PDF 正文文本、切分为语义片段、调用百炼 text-embedding-v4 模型生成句向量,并将结果特征表写回 MaxCompute。
适用场景#
文档知识库 RAG 问答。
论文、合同、研报的语义检索。
企业文档分类与聚类分析。
重复文档与近似段落检测。
面向检索、分析和数据闭环的文本特征资产沉淀。
核心处理流程#
前提条件#
# |
条件 |
说明 |
|---|---|---|
1 |
开通 MaxCompute |
需要一个可用的 MaxCompute 项目,并准备有效的 Access ID / Access Key。 |
2 |
开通 DPE 引擎 |
PDF 解析 UDF 和 |
3 |
上传 PDF 至 OSS |
已将原始 PDF 上传至目标 OSS bucket。 |
4 |
OSS RAM 角色授权 |
MaxFrame 通过 OSS 文件挂载读取 PDF,需要配置 Role ARN。 |
5 |
购买模型计算服务 |
调用 MaxCompute 托管 Embedding 模型需要模型计算服务承载推理流量。 |
6 |
MaxFrame SDK 版本 |
请使用 MaxFrame SDK 2.6.0 及以上版本( |
环境准备#
配置 ODPS 凭证、OSS 访问参数和 Embedding 模型。请将所有占位符替换为你自己的项目配置。
ODPS_ACCESS_ID = "<your_access_id>"
ODPS_ACCESS_KEY = "<your_access_key>"
ODPS_PROJECT = "<your_mc_project>"
ODPS_ENDPOINT = "https://service.<region>.maxcompute.aliyun.com/api"
OUTPUT_TABLE = "document_embedding_pipeline_results"
OSS_BUCKET_NAME = "<your_oss_bucket>"
OSS_ENDPOINT = "oss-<region>.aliyuncs.com"
OSS_DATA_PREFIX = "documents"
OSS_STORAGE_OPTIONS = {"role_arn": "<your_role_arn>"}
EMBED_MODEL_ID = "text-embedding-v4"
EMBED_MODEL_PROJECT = "bigdata_public_modelset"
CHUNK_SIZE = 2048
CHUNK_OVERLAP = 200
在 DPE 上打开 MaxFrame session:
import maxframe
import pandas as pd
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.config import options
from maxframe.udf import with_fs_mount, with_python_requirements, with_running_options
from odps import ODPS
o = ODPS(
access_id=ODPS_ACCESS_ID,
secret_access_key=ODPS_ACCESS_KEY,
project=ODPS_PROJECT,
endpoint=ODPS_ENDPOINT,
)
options.dag.settings = {
"engine_order": ["DPE"],
"unavailable_engines": ["MCSQL", "SPE"],
}
options.session.gu_quota_name = "<your_gu_quota_name>"
session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView : {session.get_logview_address()}")
步骤 1:准备 PDF 路径#
提供 PDF 在 OSS bucket 下的相对路径。UDF 会将 bucket 挂载到 /mnt/oss,并以 /mnt/oss/<pdf_path> 的形式打开文件。
PDF_PATHS = [
"documents/attention_is_all_you_need.pdf",
"documents/bert.pdf",
"documents/gpt3.pdf",
"documents/llama.pdf",
"documents/llama2.pdf",
]
paths_df = md.DataFrame(pd.DataFrame({"pdf_path": PDF_PATHS}))
步骤 2:解析 PDF 并切片#
UDF 职责单一:使用 pymupdf 抽取页面文本,并通过 RecursiveCharacterTextSplitter 按段落和句子边界切分。输出 schema 为 pdf_path、page_number 和 chunk_text。
@with_python_requirements("pymupdf", "langchain-text-splitters")
@with_running_options(engine="dpe", cpu=8, memory=16)
@with_fs_mount(
f"oss://{OSS_ENDPOINT}/{OSS_BUCKET_NAME}/",
"/mnt/oss",
storage_options=OSS_STORAGE_OPTIONS,
)
def extract_chunks(chunk):
"""Extract text and chunk each PDF into row-level records."""
import os
import pymupdf
from langchain_text_splitters import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
separators=["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""],
keep_separator=True,
)
rows = []
for pdf_path in chunk["pdf_path"].tolist():
doc = pymupdf.Document(os.path.join("/mnt/oss", pdf_path))
for page in doc:
page_text = page.get_text()
if not page_text or not page_text.strip():
continue
for chunk_text in splitter.split_text(page_text):
rows.append(
{
"pdf_path": pdf_path,
"page_number": page.number,
"chunk_text": chunk_text,
}
)
return pd.DataFrame(
rows,
columns=["pdf_path", "page_number", "chunk_text"],
)
chunks_df = paths_df.mf.apply_chunk(
extract_chunks,
output_type="dataframe",
dtypes=pd.Series(
{
"pdf_path": "object",
"page_number": "int64",
"chunk_text": "object",
}
),
)
chunks_df.execute().fetch()
步骤 3:调用百炼生成 Embedding#
通过 read_odps_model 从 MaxCompute 加载百炼公开 text-embedding-v4 模型,并对 chunk_text 列批量生成向量。Embedding 调用由 MaxCompute 模型计算服务托管完成。
available_models = list(o.list_models(project=EMBED_MODEL_PROJECT))
[m.name for m in available_models if m.name == EMBED_MODEL_ID]
from maxframe.learn.utils import read_odps_model
llm = read_odps_model(EMBED_MODEL_ID, project=EMBED_MODEL_PROJECT)
embeddings = llm.embed(
chunks_df["chunk_text"],
running_options={"max_tokens": 1024, "verbose": True},
# By default the response DataFrame includes provider response
# metadata. ``simple_output=True`` returns the embedding data directly.
simple_output=True,
)
# Use the ``response`` column as the raw embedding JSON.
result_df = chunks_df.assign(embedding=embeddings["response"])
result_df.execute().fetch()
步骤 4:转换 Embedding JSON#
百炼 embed() 返回形如 {{"data": [{{"embedding": [...]}}], ...}} 的 JSON 字符串。将其转换为扁平浮点数组 JSON 字符串,便于下游检索和相似度计算任务直接读取向量列。
def parse_embedding(s):
"""Extract the raw Bailian embedding JSON as a flat float-array JSON string."""
import json
if s is None:
return None
return json.dumps(json.loads(s)["data"][0]["embedding"])
result_df = result_df.assign(
embedding=result_df["embedding"].map(parse_embedding, dtype="object")
)
result_df["embedding"].execute().fetch()
步骤 5:写入结果表#
将处理后的文本片段和 Embedding 持久化到 MaxCompute。
md.to_odps_table(result_df, OUTPUT_TABLE, overwrite=True).execute()
print(f"Result written to {OUTPUT_TABLE}")
作业完成后,释放 MaxFrame session:
session.destroy()
常见问题排查#
问题 |
原因 |
解决方案 |
|---|---|---|
|
项目未开通 DPE。 |
联系管理员开通 DPE 引擎。 |
|
RAM 角色配置不正确。 |
检查 |
|
PDF 文件损坏或已加密。 |
预先过滤不可读 PDF,或在 UDF 中捕获异常并跳过失败文件。 |
写表失败或列类型报错 |
|
对齐 |
|
Quota 与 MaxCompute 项目不在同一地域。 |
确认 |