PDF 文本解析和百炼 Embedding#

Available at MaxFrame 2.6.0

背景信息#

论文、合同、研究报告、产品手册、白皮书等 PDF 是企业和研究机构常见的文档载体,承载着大量非结构化业务知识。原始 PDF 版式多样、长度不一、缺少语义切分,难以直接用于语义检索、RAG 问答、文档分类等下游场景。

本最佳实践演示如何基于 MaxFrame 和 MaxCompute DPE 引擎运行分布式 PDF 预处理流水线:抽取 PDF 正文文本、切分为语义片段、调用百炼 text-embedding-v4 模型生成句向量,并将结果特征表写回 MaxCompute。

适用场景#

  • 文档知识库 RAG 问答。

  • 论文、合同、研报的语义检索。

  • 企业文档分类与聚类分析。

  • 重复文档与近似段落检测。

  • 面向检索、分析和数据闭环的文本特征资产沉淀。

核心处理流程#

PDF parsing and Bailian embedding workflow

前提条件#

#

条件

说明

1

开通 MaxCompute

需要一个可用的 MaxCompute 项目,并准备有效的 Access ID / Access Key。

2

开通 DPE 引擎

PDF 解析 UDF 和 apply_chunk 需要在 DPE 上执行。

3

上传 PDF 至 OSS

已将原始 PDF 上传至目标 OSS bucket。

4

OSS RAM 角色授权

MaxFrame 通过 OSS 文件挂载读取 PDF,需要配置 Role ARN。

5

购买模型计算服务

调用 MaxCompute 托管 Embedding 模型需要模型计算服务承载推理流量。

6

MaxFrame SDK 版本

请使用 MaxFrame SDK 2.6.0 及以上版本(pip install maxframe>=2.6.0)。

环境准备#

配置 ODPS 凭证、OSS 访问参数和 Embedding 模型。请将所有占位符替换为你自己的项目配置。

ODPS_ACCESS_ID = "<your_access_id>"
ODPS_ACCESS_KEY = "<your_access_key>"
ODPS_PROJECT = "<your_mc_project>"
ODPS_ENDPOINT = "https://service.<region>.maxcompute.aliyun.com/api"
OUTPUT_TABLE = "document_embedding_pipeline_results"

OSS_BUCKET_NAME = "<your_oss_bucket>"
OSS_ENDPOINT = "oss-<region>.aliyuncs.com"
OSS_DATA_PREFIX = "documents"
OSS_STORAGE_OPTIONS = {"role_arn": "<your_role_arn>"}

EMBED_MODEL_ID = "text-embedding-v4"
EMBED_MODEL_PROJECT = "bigdata_public_modelset"

CHUNK_SIZE = 2048
CHUNK_OVERLAP = 200

在 DPE 上打开 MaxFrame session:

import maxframe
import pandas as pd
import maxframe.dataframe as md
from maxframe import new_session
from maxframe.config import options
from maxframe.udf import with_fs_mount, with_python_requirements, with_running_options
from odps import ODPS

o = ODPS(
    access_id=ODPS_ACCESS_ID,
    secret_access_key=ODPS_ACCESS_KEY,
    project=ODPS_PROJECT,
    endpoint=ODPS_ENDPOINT,
)

options.dag.settings = {
    "engine_order": ["DPE"],
    "unavailable_engines": ["MCSQL", "SPE"],
}
options.session.gu_quota_name = "<your_gu_quota_name>"

session = new_session(o)
print(f"Session ID : {session.session_id}")
print(f"LogView    : {session.get_logview_address()}")

步骤 1:准备 PDF 路径#

提供 PDF 在 OSS bucket 下的相对路径。UDF 会将 bucket 挂载到 /mnt/oss,并以 /mnt/oss/<pdf_path> 的形式打开文件。

PDF_PATHS = [
    "documents/attention_is_all_you_need.pdf",
    "documents/bert.pdf",
    "documents/gpt3.pdf",
    "documents/llama.pdf",
    "documents/llama2.pdf",
]

paths_df = md.DataFrame(pd.DataFrame({"pdf_path": PDF_PATHS}))

步骤 2:解析 PDF 并切片#

UDF 职责单一:使用 pymupdf 抽取页面文本,并通过 RecursiveCharacterTextSplitter 按段落和句子边界切分。输出 schema 为 pdf_pathpage_numberchunk_text

@with_python_requirements("pymupdf", "langchain-text-splitters")
@with_running_options(engine="dpe", cpu=8, memory=16)
@with_fs_mount(
    f"oss://{OSS_ENDPOINT}/{OSS_BUCKET_NAME}/",
    "/mnt/oss",
    storage_options=OSS_STORAGE_OPTIONS,
)
def extract_chunks(chunk):
    """Extract text and chunk each PDF into row-level records."""
    import os

    import pymupdf
    from langchain_text_splitters import RecursiveCharacterTextSplitter

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
        separators=["\n\n", "\n", "。", "!", "?", ". ", "! ", "? ", " ", ""],
        keep_separator=True,
    )

    rows = []
    for pdf_path in chunk["pdf_path"].tolist():
        doc = pymupdf.Document(os.path.join("/mnt/oss", pdf_path))
        for page in doc:
            page_text = page.get_text()
            if not page_text or not page_text.strip():
                continue
            for chunk_text in splitter.split_text(page_text):
                rows.append(
                    {
                        "pdf_path": pdf_path,
                        "page_number": page.number,
                        "chunk_text": chunk_text,
                    }
                )

    return pd.DataFrame(
        rows,
        columns=["pdf_path", "page_number", "chunk_text"],
    )

chunks_df = paths_df.mf.apply_chunk(
    extract_chunks,
    output_type="dataframe",
    dtypes=pd.Series(
        {
            "pdf_path": "object",
            "page_number": "int64",
            "chunk_text": "object",
        }
    ),
)

chunks_df.execute().fetch()

步骤 3:调用百炼生成 Embedding#

通过 read_odps_model 从 MaxCompute 加载百炼公开 text-embedding-v4 模型,并对 chunk_text 列批量生成向量。Embedding 调用由 MaxCompute 模型计算服务托管完成。

available_models = list(o.list_models(project=EMBED_MODEL_PROJECT))
[m.name for m in available_models if m.name == EMBED_MODEL_ID]
from maxframe.learn.utils import read_odps_model

llm = read_odps_model(EMBED_MODEL_ID, project=EMBED_MODEL_PROJECT)
embeddings = llm.embed(
    chunks_df["chunk_text"],
    running_options={"max_tokens": 1024, "verbose": True},
    # By default the response DataFrame includes provider response
    # metadata. ``simple_output=True`` returns the embedding data directly.
    simple_output=True,
)

# Use the ``response`` column as the raw embedding JSON.
result_df = chunks_df.assign(embedding=embeddings["response"])
result_df.execute().fetch()

步骤 4:转换 Embedding JSON#

百炼 embed() 返回形如 {{"data": [{{"embedding": [...]}}], ...}} 的 JSON 字符串。将其转换为扁平浮点数组 JSON 字符串,便于下游检索和相似度计算任务直接读取向量列。

def parse_embedding(s):
    """Extract the raw Bailian embedding JSON as a flat float-array JSON string."""
    import json

    if s is None:
        return None
    return json.dumps(json.loads(s)["data"][0]["embedding"])

result_df = result_df.assign(
    embedding=result_df["embedding"].map(parse_embedding, dtype="object")
)

result_df["embedding"].execute().fetch()

步骤 5:写入结果表#

将处理后的文本片段和 Embedding 持久化到 MaxCompute。

md.to_odps_table(result_df, OUTPUT_TABLE, overwrite=True).execute()
print(f"Result written to {OUTPUT_TABLE}")

作业完成后,释放 MaxFrame session:

session.destroy()

常见问题排查#

问题

原因

解决方案

Engine DPE not available

项目未开通 DPE。

联系管理员开通 DPE 引擎。

OSS access denied

RAM 角色配置不正确。

检查 role_arn,并确认 RAM 角色已具备 OSS 读取权限。

pymupdf 读取 PDF 失败

PDF 文件损坏或已加密。

预先过滤不可读 PDF,或在 UDF 中捕获异常并跳过失败文件。

写表失败或列类型报错

apply_chunk 的 dtypes 与目标表 schema 不一致。

对齐 pdf_pathpage_numberchunk_textembedding 的列类型。

gu_quota 不可用

Quota 与 MaxCompute 项目不在同一地域。

确认 ODPS_ENDPOINToptions.session.gu_quota_name 使用同一地域。