当前位置: 首页 > news >正文

多模态RAG赛题实战--Datawhale AI夏令营

参考自科大讯飞AI大赛(多模态RAG方向) - Datawhale

赛题意义:

我们正处在一个信息爆炸的时代,但这些信息并非以整洁的纯文本形式存在。它们被封装在各种各样的载体中:公司的年度财报、市场研究报告、产品手册、学术论文以及无数的网页。这些载体的共同特点是 图文混排 ——文字、图表、照片、流程图等元素交织在一起,共同承载着完整的信息。

传统的AI技术,如搜索引擎或基于文本的问答系统,在处理这类复杂文档时显得力不从心。它们能很好地理解文字,但对于图表中蕴含的趋势、数据和关系却是“视而不见”的。这就造成了一个巨大的信息鸿沟:AI无法回答那些需要结合视觉内容才能解决的问题,例如“根据这张条形图,哪个产品的市场份额最高?”或“请解释一下这张流程图的工作原理”。

多模态技术方案选择:

基于图片描述 :对所有图片生成文本描述,将这些描述与原文的文本块统一处理。这能将多模态问题简化为纯文本问题,最适合快速构建Baseline。

具体工具栈:

PDF解析 :这个环节我们选择的mineru,但是task1里面为了降低大家的学习门槛,使用的是pymupdf作为平替方案。
Embedding实现 :我最初考虑使用 sentence-transformer 库。但在进一步查阅资料时,我发现了 Xinference  ,它能将模型部署为服务,并通过兼容OpenAI的API来调用。我立即决定采用这种方式,因为 服务化能让我的Embedding模块与主应用逻辑解耦,更利于调试和未来的扩展。


Baseline执行流程:

时间消耗问题:

解决 : 不要在一个脚本里完成所有事 。强烈建议使用 Jupyter Notebook 进行开发调试,并将流程拆分:

  1. 第一阶段:解析 。在一个Notebook中,专门负责调用pymupdf,将所有PDF解析为JSON并 保存到本地 。这个阶段成功运行一次后,就不再需要重复执行。

  2. 第二阶段:预处理与Embedding 。在另一个Notebook中,读取第一步生成的JSON文件,进行图片描述生成、数据清洗,并调用Embedding模型。将最终包含向量的知识库 保存为持久化文件

  3. 第三阶段:检索与生成 。在第三个Notebook中,加载第二步保存好的知识库,专注于调试检索逻辑和Prompt工程。

通过这种 分步执行、缓存中间结果 的方式,可以极大地提高调试效率,每次修改只需运行对应的、耗时较短的模块。

核心问题:

  1. LLM需要什么样的信息? 检索模块是LLM的眼睛和耳朵。我们是应该只把和问题最相似的那一小块知识喂给LLM,还是应该提供更丰富的周边信息?例如,找到一个关键段落后,是否应该把它的上下文(前后段落、所属章节标题)也一并提供,来帮助LLM更好地理解?

Baseline代码:

1、将压缩包解压到指定目录:

unzip -n datas/财报数据库.zip -d datas/
  • -n不覆盖已存在的文件(已有文件会跳过解压)

  • datas/财报数据库.zip → 压缩包位置

  • -d datas/ → 解压到 datas/ 目录

2、文档分块(没有涉及多模态的理解,对图表类也仅仅是提取文本)

批量遍历./datas目录下(含子目录,即./datas/财报数据库)的所有 PDF 文件,然后用 PyMuPDF(fitz逐页提取文本内容,最终把每一页的文字和相关信息整理成 JSON 文件保存到all_pdf_page_chunks.json中(即按页的方式来分块),即我们RAG系统的最终知识库(而非向量数据库)。

fitz_pipeline_all.py:

import fitz  # PyMuPDF,用于操作 PDF 文件(提取文字、图片等)
import json
from pathlib import Path  # Python 标准库,用于更方便地处理路径def process_pdfs_to_chunks(datas_dir: Path, output_json_path: Path):"""扫描指定目录下的所有 PDF 文件,将每个 PDF 按页提取文本内容,转换成“页面数据块(chunk)”,最终统一保存到一个 JSON 文件中。Args:datas_dir (Path): 存放 PDF 文件的目录(可以包含子目录)。output_json_path (Path): 保存最终 JSON 文件的路径。"""all_chunks = []  # 用于存储所有 PDF 页面的数据块# 递归搜索 datas_dir 目录下的所有 .pdf 文件pdf_files = list(datas_dir.rglob('*.pdf'))if not pdf_files:print(f"警告:在目录 '{datas_dir}' 中未找到任何 PDF 文件。")returnprint(f"找到 {len(pdf_files)} 个 PDF 文件,开始处理...")# 遍历找到的每一个 PDF 文件for pdf_path in pdf_files:file_name_stem = pdf_path.stem  # 文件名(不包含扩展名)full_file_name = pdf_path.name  # 完整文件名(含扩展名)print(f"  - 正在处理: {full_file_name}")try:# 打开 PDF 文件(with 确保用完后自动关闭)with fitz.open(pdf_path) as doc:# 遍历 PDF 的每一页for page_idx, page in enumerate(doc):# 提取当前页面的文字("text" 模式只取文本,不含图片)content = page.get_text("text")# 如果该页没有任何文字,则跳过if not content.strip():continue# 构造当前页面的“数据块”字典chunk = {"id": f"{file_name_stem}_page_{page_idx}",  # 唯一 ID(文件名 + 页码)"content": content,  # 当前页的文字内容"metadata": {  # 元信息"page": page_idx,  # 页码(从 0 开始)"file_name": full_file_name  # 源文件名}}all_chunks.append(chunk)  # 添加到总列表中except Exception as e:# 如果某个 PDF 处理出错,则打印错误信息并继续处理下一个print(f"处理文件 '{pdf_path}' 时发生错误: {e}")# 确保输出目录存在(没有就创建)output_json_path.parent.mkdir(parents=True, exist_ok=True)# 将收集到的所有数据块写入 JSON 文件with open(output_json_path, 'w', encoding='utf-8') as f:json.dump(all_chunks, f, ensure_ascii=False, indent=2)print(f"\n处理完成!所有内容已保存至: {output_json_path}")def main():# 获取当前脚本所在目录base_dir = Path(__file__).parent# 假设 PDF 数据在当前目录的 datas 文件夹中datas_dir = base_dir / 'datas'# 生成的 JSON 存放在当前目录,文件名为 all_pdf_page_chunks.jsonchunk_json_path = base_dir / 'all_pdf_page_chunks.json'# 执行 PDF 批量处理process_pdfs_to_chunks(datas_dir, chunk_json_path)if __name__ == '__main__':main()

两层循环,所有pdf文档进行循环,然后是一个pdf文档中所有页进行循环,最终JSON文件以每一页内容为单位进行记录。(即从一个pdf文件的第一页开始,直到最后一个pdf的最后一页),即按页的方式来分块。

每一个chunk(或者说每一页)字典输出格式:
                    chunk = {
                        "id": f"{file_name_stem}_page_{page_idx}",
                        "content": content,
                        "metadata": {
                            "page": page_idx,  # 0-based page index
                            "file_name": full_file_name
                        }
                    }

如下所示:

[{"id": "file1_page_0","content": "第一页的文字内容...","metadata": {"page": 0,"file_name": "file1.pdf"}},{"id": "file1_page_1","content": "第二页的文字内容...","metadata": {"page": 1,"file_name": "file1.pdf"}},{"id": "file2_page_0","content": "第一页的文字内容...","metadata": {"page": 0,"file_name": "file2.pdf"}}
]

3、构建一个简易的多文档 RAG系统

步骤如下:

  • 读取 PDF 页内容的 JSON 文件(之前我们提取的 all_pdf_page_chunks.json)。

  • 调用本地的 embedding 接口把每个页面内容(键content对应的值)转成向量:embeddings = self.embedding_model.embed_texts([c['content'] for c in chunks])

  • 把向量和对应的chunk(page)一起存进一个简单的向量库SimpleVectorStore,本Baseline为了简化,用一个内存列表( SimpleVectorStore )模拟了向量数据库的功能,可以看详细代码直接定义了两个列表去储存,也就是直接在内存中存 embeddings + chunks,然后用 numpy 实现余弦相似度搜索):self.vector_store.add_chunks(chunks, embeddings)

  • 接收一个问题 → 检索最相似的页面 → 调用大模型生成结构化回答(JSON 格式 answer / filename / page),检索步骤如下:

    generate_answer():先检索回Top-k个chunk,再把所有召回的chunk依次拼接成上下文(文件名 + 页码 + 内容)全部join起来成为一个大的上下文context;生成一个 prompt,要求模型用 JSON 格式返回;调用本地的大模型 API(LOCAL_TEXT_MODEL);用 extract_json_array 解析模型输出的 JSON,如果解析失败,就用原始文本+第一条检索结果的文件名和页码;返回结构化字典:question, answer, filename, page, retrieval_chunks

  • prompt如下:

    prompt = (f"你是一名专业的金融分析助手,请根据以下检索到的内容回答用户问题。\n"f"请严格按照如下JSON格式输出:\n"f'{{"answer": "你的简洁回答", "filename": "来源文件名", "page": "来源页码"}}'"\n"f"检索内容:\n{context}\n\n问题:{question}\n"f"请确保输出内容为合法JSON字符串,不要输出多余内容。")

  • 支持批量测试,可从 datas/test.json 读取一堆问题并并发处理,最后输出两个结果文件。rag_top1_pred_raw.json:原始结果(包含 retrieval_chunks 和原始 idx)rag_top1_pred.json:过滤结果(去掉 retrieval_chunks,未回答的补空白),用于评测结果的提交,包含test.json里面的全部问题和针对其中10个问题的答案。

  • 采用线程池并发调用的方式:with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:,因为在RAG检索和LLM回答的瓶颈不是 CPU 计算,而是 IO 等待(网络延迟 + 模型响应时间),而现在一次性有10个问题,因此线程池并发而不是顺序执行的方式,把多个题目同时处理,大幅减少总耗时。

rag_from_page_chunks.py:

import json
import osimport hashlib
from typing import List, Dict, Any
from tqdm import tqdm
import sys
import concurrent.futures
import randomfrom get_text_embedding import get_text_embeddingfrom dotenv import load_dotenv
from openai import OpenAI
# 统一加载项目根目录的.env
load_dotenv()class PageChunkLoader:def __init__(self, json_path: str):self.json_path = json_pathdef load_chunks(self) -> List[Dict[str, Any]]:with open(self.json_path, 'r', encoding='utf-8') as f:return json.load(f)class EmbeddingModel:def __init__(self, batch_size: int = 64):self.api_key = os.getenv('LOCAL_API_KEY')self.base_url = os.getenv('LOCAL_BASE_URL')self.embedding_model = os.getenv('LOCAL_EMBEDDING_MODEL')self.batch_size = batch_sizeif not self.api_key or not self.base_url:raise ValueError('请在.env中配置LOCAL_API_KEY和LOCAL_BASE_URL')def embed_texts(self, texts: List[str]) -> List[List[float]]:return get_text_embedding(texts,api_key=self.api_key,base_url=self.base_url,embedding_model=self.embedding_model,batch_size=self.batch_size)def embed_text(self, text: str) -> List[float]:return self.embed_texts([text])[0]class SimpleVectorStore:def __init__(self):self.embeddings = []self.chunks = []
# 直接内存中存 embeddings + chunksdef add_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[float]]):self.chunks.extend(chunks)self.embeddings.extend(embeddings)def search(self, query_embedding: List[float], top_k: int = 3) -> List[Dict[str, Any]]:from numpy import dotfrom numpy.linalg import normimport numpy as npif not self.embeddings:return []emb_matrix = np.array(self.embeddings)query_emb = np.array(query_embedding)sims = emb_matrix @ query_emb / (norm(emb_matrix, axis=1) * norm(query_emb) + 1e-8)idxs = sims.argsort()[::-1][:top_k]return [self.chunks[i] for i in idxs]class SimpleRAG:def __init__(self, chunk_json_path: str, model_path: str = None, batch_size: int = 32):self.loader = PageChunkLoader(chunk_json_path)self.embedding_model = EmbeddingModel(batch_size=batch_size)self.vector_store = SimpleVectorStore()def setup(self):print("加载所有页chunk...")chunks = self.loader.load_chunks()print(f"共加载 {len(chunks)} 个chunk")print("生成嵌入...")embeddings = self.embedding_model.embed_texts([c['content'] for c in chunks])print("存储向量...")self.vector_store.add_chunks(chunks, embeddings)print("RAG向量库构建完成!")def query(self, question: str, top_k: int = 3) -> Dict[str, Any]:q_emb = self.embedding_model.embed_text(question)results = self.vector_store.search(q_emb, top_k)return {"question": question,"chunks": results}def generate_answer(self, question: str, top_k: int = 3) -> Dict[str, Any]:"""检索+大模型生成式回答,返回结构化结果"""qwen_api_key = os.getenv('LOCAL_API_KEY')qwen_base_url = os.getenv('LOCAL_BASE_URL')qwen_model = os.getenv('LOCAL_TEXT_MODEL')if not qwen_api_key or not qwen_base_url or not qwen_model:raise ValueError('请在.env中配置LOCAL_API_KEY、LOCAL_BASE_URL、LOCAL_TEXT_MODEL')q_emb = self.embedding_model.embed_text(question)chunks = self.vector_store.search(q_emb, top_k)# 拼接检索内容,带上元数据context = "\n".join([f"[文件名]{c['metadata']['file_name']} [页码]{c['metadata']['page']}\n{c['content']}" for c in chunks])# 明确要求输出JSON格式 answer/page/filenameprompt = (f"你是一名专业的金融分析助手,请根据以下检索到的内容回答用户问题。\n"f"请严格按照如下JSON格式输出:\n"f'{{"answer": "你的简洁回答", "filename": "来源文件名", "page": "来源页码"}}'"\n"f"检索内容:\n{context}\n\n问题:{question}\n"f"请确保输出内容为合法JSON字符串,不要输出多余内容。")client = OpenAI(api_key=qwen_api_key, base_url=qwen_base_url)completion = client.chat.completions.create(model=qwen_model,messages=[{"role": "system", "content": "你是一名专业的金融分析助手。"},{"role": "user", "content": prompt}],temperature=0.2,max_tokens=1024)import json as pyjsonfrom extract_json_array import extract_json_arrayraw = completion.choices[0].message.content.strip()# 用 extract_json_array 提取 JSON 对象json_str = extract_json_array(raw, mode='objects')if json_str:try:arr = pyjson.loads(json_str)# 只取第一个对象if isinstance(arr, list) and arr:j = arr[0]answer = j.get('answer', '')filename = j.get('filename', '')page = j.get('page', '')else:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''except Exception:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''else:answer = rawfilename = chunks[0]['metadata']['file_name'] if chunks else ''page = chunks[0]['metadata']['page'] if chunks else ''# 结构化输出return {"question": question,"answer": answer,"filename": filename,"page": page,"retrieval_chunks": chunks}if __name__ == '__main__':# 路径可根据实际情况调整chunk_json_path = "./all_pdf_page_chunks.json"rag = SimpleRAG(chunk_json_path)rag.setup()# 控制测试时读取的题目数量,默认只随机抽取10个,实际跑全部时设为NoneTEST_SAMPLE_NUM = 10  # 设置为None则全部跑FILL_UNANSWERED = True  # 未回答的也输出默认内容# 批量评测脚本:读取测试集,检索+大模型生成,输出结构化结果test_path = "./datas/test.json"if os.path.exists(test_path):with open(test_path, 'r', encoding='utf-8') as f:test_data = json.load(f)# 记录所有原始索引all_indices = list(range(len(test_data)))# 随机抽取部分题目用于测试selected_indices = all_indicesif TEST_SAMPLE_NUM is not None and TEST_SAMPLE_NUM > 0:if len(test_data) > TEST_SAMPLE_NUM:selected_indices = sorted(random.sample(all_indices, TEST_SAMPLE_NUM))def process_one(idx):item = test_data[idx]question = item['question']tqdm.write(f"[{selected_indices.index(idx)+1}/{len(selected_indices)}] 正在处理: {question[:30]}...")result = rag.generate_answer(question, top_k=5)return idx, resultresults = []if selected_indices:with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:results = list(tqdm(executor.map(process_one, selected_indices), total=len(selected_indices), desc='并发批量生成'))# 先输出一份未过滤的原始结果(含 idx)raw_out_path = "./rag_top1_pred_raw.json"with open(raw_out_path, 'w', encoding='utf-8') as f:json.dump(results, f, ensure_ascii=False, indent=2)print(f'已输出原始未过滤结果到: {raw_out_path}')# 只保留结果部分,并去除 retrieval_chunks 字段idx2result = {idx: {k: v for k, v in r.items() if k != 'retrieval_chunks'} for idx, r in results}filtered_results = []for idx, item in enumerate(test_data):if idx in idx2result:filtered_results.append(idx2result[idx])elif FILL_UNANSWERED:# 未被回答的,补默认内容filtered_results.append({"question": item.get("question", ""),"answer": "","filename": "","page": "",})# 输出结构化结果到jsonout_path = "./rag_top1_pred.json"with open(out_path, 'w', encoding='utf-8') as f:json.dump(filtered_results, f, ensure_ascii=False, indent=2)print(f'已输出结构化检索+大模型生成结果到: {out_path}')else:print("datas/test.json 不存在")

目前Baseline存在的问题:

1、在第二步的文档解析(没有涉及多模态的理解,对图表类也仅仅是提取文本,即目前采用的是图像文本化)?这个方法会造成信息损失 :没有提取图片里面的内容。

2、Embedding那里针对4263个chunk(page)共耗时3分钟,在做向量相似度的search的时候耗时11分钟(随机抽取 TEST_SAMPLE_NUM=10 个问题,并做了并发调用),每个问题召回的chunk数目TOP-K是5,如何进行加速?

3、将文档按照页面切分成独立的块进行检索,可能会破坏原文中段落与段落、段落与图表之间的上下文关联。检索出的知识块可能是孤立的,缺乏上下文。

4、检索策略单一 :仅基于语义相似度的检索,对于一些包含特定关键词或需要大范围信息整合的问题,可能不是最优解。

提分策略:

  1. 分块策略 (Chunking Strategy) :

    • 目前是按“页”分块,这样做简单但粗糙。是否可以尝试更细粒度的分块,比如按段落、甚至固定长度的句子分块?这会如何影响检索的精度和召回率?

    • 如何处理跨越多个块的表格或段落?是否可以引入重叠(Overlap)分块的策略?

  2. 检索优化 (Retrieval Optimization) :

    • 当前的Top-K检索策略很简单。如果检索回来的5个块中,只有1个是真正相关的,其他4个都是噪音,这会严重干扰LLM。如何提高检索结果的信噪比?

    • 可以引入重排(Re-ranking)模型吗?即在初步检索(召回)出20个候选块后,用一个更强的模型对这20个块进行重新排序,选出最相关的5个。

  3. Prompt工程 (Prompt Engineering) :

    • rag_from_page_chunks.py 中的Prompt是整个生成环节的灵魂。你能设计出更好的Prompt吗?

    • 比如,如何更清晰地指示LLM在多个来源中选择最精确的那一个?如何让它在信息不足时回答“根据现有信息无法回答”,而不是产生幻觉?

  4. 多模态融合 (Multimodal Fusion) :

    • “图片->文字描述”的方案有信息损失。有没有办法做得更好?

    • 可以尝试 多路召回 吗?即文本问题同时去检索文本库和图片库(使用CLIP等多模态向量模型),将检索到的文本和图片信息都提供给一个多模态大模型(如Qwen-VL),让它自己去融合信息并作答。

  5. 升级数据解析方案:从 fitz MinerU

  • 这是至关重要的进阶环节。基础方案所使用的 fitz_pipeline_all.py 仅能提取文本,会遗漏表格、图片等关键信息。

提升攻略1:Prompt工程设计

1、明确的来源选择指令

告诉 LLM,即使有多个来源,也要进行判断和筛选,选择质量最高的那个来作为最终的引用。

prompt = (f"你是一名专业的金融分析助手,请根据以下检索到的内容回答用户问题。\n"f"---检索内容---\n"f"{context}\n"f"---用户问题---\n"f"{question}\n"f"请严格遵循以下规则:\n"f"1. **只使用**上述检索内容中的信息进行回答,不要使用你的外部知识。\n"f"2. 如果有多个来源(文件名和页码)提供了相似或互补的信息,请综合它们。但在最终回答中,**只引用最直接、最完整的那个来源**的文件名和页码。\n"f"3. 你的回答必须是简洁、精确的,并且直接引用或概括检索到的内容。\n"f"4. 严格按照如下JSON格式输出,不要输出多余内容:\n"f'{{"answer": "你的简洁回答", "filename": "来源文件名", "page": "来源页码"}}'
)

2、处理信息不足的策略

防止 LLM 在信息不足时产生幻觉是 RAG 系统设计中一个重要挑战,与其让它“编造”一个看似合理的答案,不如明确告诉它在这种情况下该怎么做,比如在 Prompt 中加入无法回答的指令:

prompt = (# ... (前面的 Prompt 内容不变) ...f"请严格遵循以下规则:\n"f"1. **只使用**上述检索内容中的信息进行回答,不要使用你的外部知识。\n"f"2. 如果检索内容中**没有足够的信息来回答问题**,请直接在 'answer' 字段中回答:'根据现有信息无法回答'。此时,`filename` 和 `page` 字段都应为空。\n"f"3. 如果有多个来源... (其余规则不变) ..."
)

http://www.lryc.cn/news/615213.html

相关文章:

  • 如何分析需求的可行性
  • 生产环境某业务服务JVM调优总结
  • 避免在微信小程序中频繁使用setData方法
  • 扩散LLM推理新范式:打破生成长度限制,实现动态自适应调节
  • 机器学习——09 聚类算法
  • BGP 协议笔记
  • 使用qemu运行与GDB调试内核
  • 微软推出革命性AI安全工具Project IRE,重塑网络安全防御新范式
  • 用天气预测理解分类算法-从出门看天气到逻辑回归
  • Kubernetes(K8s)不同行业的典型应用场景及价值分析 原创
  • windows、linux应急响应入侵排查
  • Qdrant Filtering:must / should / must_not 全解析(含 Python 实操)
  • 【2025】Datawhale AI夏令营-多模态RAG-Task1、Task2笔记-任务理解与Baseline代码解读
  • 金融通用智能体(Financial General Agent, FGA)的端到端解决方案
  • 机器翻译中的语言学基础详解(包括包括语法、句法和语义学等)
  • C语言:构造类型
  • TDengine IDMP 产品基本概念
  • 使用 Visual Studio 2022 编译 PortAudio 项目
  • occworld(1):论文解读
  • Ghost备份分区设置分包大小方法
  • 任务发布悬赏查询管理地推抖音快手微信任务赚佣金网站源码功能详解二开
  • 谷歌警告云存储桶劫持攻击
  • 让大模型 “睡觉”:把版本迭代当作人类睡眠来设计(附可直接改造的训练作息表与代码)
  • n沟道增强型mos管
  • B.10.01.6-DDD领域驱动设计:从理论到落地的完整指南
  • Typora上传图片保存到assets目录下
  • 第十四届蓝桥杯青少年组省赛 编程题真题题解
  • stm32项目(24)——基于STM32的汽车CAN通信系统
  • WinForm 复合控件(用户控件):创建与使用指南
  • 深入 FastMCP 源码:认识 tool()、resource() 和 prompt() 装饰器