[Meetily后端框架] 多模型-Pydantic AI 代理-统一抽象 | SQLite管理
第5章:人工智能模型交互(Pydantic-AI 代理)
欢迎回来!
在上一章第四章:文字记录处理逻辑中,我们学习了TranscriptProcessor
如何将冗长的会议记录分解为称为"块"的较小片段
,因为人工智能模型无法一次性处理整个文本。
我们还看到,对于每个块,处理器需要以特定格式获取结构化数据——例如行动项、决策和摘要——这些格式由我们在第三章中定义的Pydantic模型确定。
但如何实际与这些人工智能模型(如Claude、Groq、OpenAI或本地Ollama模型)对话,并可靠地获得符合我们Pydantic蓝图的输出?
这就是人工智能模型交互组件发挥作用的地方,它使用一个强大的小工具库**pydantic-ai
**。
Pydantic-AI代理
解决了什么问题?
想象我们是一个建筑师,拥有详细的房屋蓝图(我们的Pydantic模型)。我们雇佣不同的施工团队(人工智能模型)来建造它的各个部分。
面临的问题包括:
- 不同团队使用不同语言:每个施工团队使用略有不同的技术语言或工具(不同的人工智能模型API)。
如何以统一方式给出指令并接收标准化结果?
- 团队使用原始材料建造:团队使用原材料(文本)工作。我们需要他们严格按照蓝图组装这些材料,而不是随意堆砌木材和砖块。他们的自然输出只是文本,但我们需要结构化组件(例如具有特定尺寸的"墙"对象)。
- 错误不可避免:有时团队可能误解蓝图的某个细节并建造出略有偏差的部件。我们需要验证他们的工作成果并要求修正。
Pydantic-AI代理
充当我们的监理工头来处理这些问题。它通过pydantic-ai
库实现:
- 作为与不同AI模型对话的统一接口
- 明确告知AI模型预期的结构化输出格式(我们的Pydantic模型)
- 将AI的原始文本输出解析为符合Pydantic蓝图的Python对象
- 处理AI输出格式不完美的情况,有时会提示AI自我修正
这是确保AI原始文本响应转换为可用结构化数据的翻译器和验证器。
类比系统层的VFS
核心概念:代理
pydantic-ai
的核心是Agent
类。我们通过以下配置创建Agent
:
- AI模型(
llm
):指定使用的人工智能模型(Claude、Ollama等)。pydantic-ai
为不同模型提供连接类。 - 目标结构(
result_type
):期望AI输出符合的Pydantic模型(例如我们的SummaryResponse
模型)。
配置完成后,我们给Agent
一个提示(指令和文本块)并要求其运行
。Agent
会自动处理后续步骤,力求返回严格遵循result_type
的Pydantic模型对象。
在TranscriptProcessor
中使用代理
我们查看TranscriptProcessor
(backend/app/transcript_processor.py
)的简化代码片段,了解如何使用pydantic-ai
代理处理文本块:
# backend/app/transcript_processor.py (简化版)
from pydantic_ai import Agent # 导入代理
# 导入不同AI供应商的连接类
from pydantic_ai.models.ollama import OllamaModel
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.models.groq import GroqModel
from pydantic_ai.models.openai import OpenAIModel # 新增支持
# 导入目标数据结构
from .transcript_processor import SummaryResponse # 我们的Pydantic模型class TranscriptProcessor:# ... __init__和其他方法...async def process_chunk_with_ai(self, chunk_text: str, model: str, model_name: str):"""使用AI代理处理单个块的辅助函数"""# 1. 根据'model'参数选择正确的AI模型连接器llm = None# (简化选择逻辑,实际代码从数据库获取API密钥)if model == "ollama":llm = OllamaModel(model_name=model_name)elif model == "claude":# 此处需要API密钥,为简洁省略llm = AnthropicModel(model_name=model_name, api_key="...")elif model == "groq":# 此处需要API密钥,为简洁省略llm = GroqModel(model_name=model_name, api_key="...")elif model == "openai":# 此处需要API密钥,为简洁省略llm = OpenAIModel(model_name=model_name, api_key="...")else:raise ValueError(f"不支持的模型: {model}")# 2. 创建Pydantic-AI代理agent = Agent(llm, # 使用的AI模型result_type=SummaryResponse, # 期望的结构(我们的蓝图!)result_retries=3 # 解析失败时重试次数)# 3. 准备提示(给AI的指令)prompt = f"""分析以下会议记录片段并提取关键信息。将输出格式化为严格遵循提供的Pydantic模式的JSON。如果某个部分在当前块中没有相关信息,请为其'blocks'返回空列表。会议记录片段:---{chunk_text}---"""# Pydantic-AI会自动将模式指令添加到提示中!# 4. 运行代理!summary_result = await agent.run(prompt)# 5. 结果应(希望)是有效的SummaryResponse对象# 从结果对象访问结构化数据# (不同pydantic-ai版本可能直接返回数据或嵌套数据)if hasattr(summary_result, 'data') and isinstance(summary_result.data, SummaryResponse):return summary_result.data # 返回Pydantic对象elif isinstance(summary_result, SummaryResponse):return summary_result # 直接返回Pydantic对象else:# 处理意外结果 - 记录错误或返回Noneprint(f"警告:代理返回意外类型: {type(summary_result)}")return None # 或抛出错误
解析:
- 选择模型连接器:根据用户选择的AI模型(如"ollama"、“claude”)创建对应的连接对象(
OllamaModel
、AnthropicModel
等)。这些对象知道如何与特定AI供应商的API通信。(注:实际代码从数据库管理系统获取API密钥,此处为简洁省略)。 - 创建代理:实例化
Agent
,传入模型连接器(llm
)和关键的result_type=SummaryResponse
。这告知代理"无论AI返回什么文本,确保可转换为SummaryResponse
Pydantic对象"。result_retries
设置解析失败时的重试次数。 - 准备提示:编写清晰的AI指令,包含实际的
chunk_text
。明确要求AI生成符合目标结构的JSON。关键点:当为Agent
设置result_type
时,pydantic-ai
会自动将SummaryResponse
模型生成的JSON模式附加到提示中,这是AI知晓所需格式的关键! - 运行代理:调用
await agent.run(prompt)
。代理将提示(包含附加模式指令)发送给AI模型,等待文本响应,并尝试解析验证。 - 获取结构化输出:成功时返回
SummaryResponse
Pydantic对象。该对象保证符合蓝图(否则代理会抛出错误)。
这个process_chunk_with_ai
辅助函数在第四章展示的TranscriptProcessor.process_transcript
主循环中被调用,逐块处理记录。
代理
的内部工作原理
通过序列图理解agent.run()
的内部流程:
流程说明:
TranscriptProcessor
调用agent.run()
传入块处理提示文本Agent
知晓需要符合SummaryResponse
的输出,因此向Pydantic模型索取JSON模式(技术性JSON格式描述)Agent
构建最终提示,包含原始指令、文本块和pydantic-ai
生成的系统指令(要求生成符合模式的JSON)- 通过特定
AIModelConnector
(如OllamaModel
)将提示发送至实际AI模型 - 外部AI模型处理提示并返回原始文本响应
Agent
接收原始文本,尝试解析为JSON格式- 使用
SummaryResponse
模型验证解析数据:是否包含必需字段?类型是否正确? - 验证失败且剩余重试次数时,构造新提示要求AI修正格式并重试
- 验证成功(或重试耗尽)后返回
SummaryResponse
对象或错误
该流程强制AI生成后端可直接使用的结构化数据,有效应对AI可能产生的格式错误。
使用Pydantic-AI代理的优势
使用pydantic-ai
带来的关键优势:
功能特性 | 优势说明 |
---|---|
模型抽象 | 通过统一的代理 接口与不同AI供应商(Claude/Ollama/Groq/OpenAI)交互,无需为每个AI重写逻辑 |
结构化输出 | 引导AI生成符合Pydantic模型的数据格式,避免手动解析非结构化文本 |
验证机制 | 自动检查AI输出是否符合Pydantic蓝图,及早发现错误 |
鲁棒性 | 通过修正或重试处理AI格式错误,提高处理可靠性 |
代码清晰度 | 使用Pydantic模型 明确定义AI输出格式,提升代码可读性 |
通过处理不同AI交互的复杂性和确保输出结构正确,Pydantic-AI代理
是文字记录处理逻辑
的关键组件,使后端能可靠获取AI生成的结构化数据。
总结
本章探讨了meeting-minutes
项目如何使用**pydantic-ai
库进行人工智能模型交互**。
我们了解到代理
作为智能中介,既处理与各种AI供应商的通信,更重要的是确保AI原始文本输出转换为严格符合Pydantic模型的有效结构化数据。这些可靠的结构化输出随后被文字记录处理逻辑用于汇总最终摘要。
当处理逻辑(由Pydantic-AI代理驱动)成功生成最终结构化摘要(SummaryResponse
Pydantic对象)后,这些重要数据需要持久化存储以供前端后续获取。
下一章我们将深入数据库管理系统,探索后端如何存储原始记录、处理状态和最终生成的摘要。
第六章:数据库管理
第6章:数据库管理
探索meeting-minutes后端系统的旅程中,已经了解
后端API
网关如何接收请求- 如何通过
摘要数据结构
(Pydantic模型)定义数据结构 - 转录处理逻辑如何
拆分长文本
- 如何通过AI模型交互(Pydantic-AI
代理
)可靠地与AI模型交互获取结构化结果
。
所有这些步骤都会产生重要信息:原始转录文本、处理任务详情(如使用的模型)、任务状态(是否完成)以及最终的摘要数据。
这些信息存储在哪里?如果服务器停止或计算机重启,数据会丢失吗?
这正是数据库管理要解决的问题。
数据库管理解决的问题
想象我们在白板上进行重要工作,一旦擦除或离开房间,所有信息都将消失。计算机程序的内存存储也存在类似临时性,程序停止时内存数据即丢失。我们需要持久化存储方案——即使程序关闭也能保留数据。
这正是数据库的作用。它如同应用程序的长期记忆或文件柜,通过结构化方式将数据存储于磁盘,实现随时存取。在我们的项目中,数据库需要存储:
- 会议详情(如标题)
- 原始转录文本
- 处理任务信息(唯一ID、状态如’待处理’/‘处理中’/‘完成’/‘失败’)
- 最终结构化摘要
- 配置设置(如AI模型API密钥)
若无数据库,每次启动应用都将从零开始,丢失所有历史记录!
解决方案:SQLite与DatabaseManager
本项目采用**SQLite**数据库:
- 轻量:无需独立服务,单文件存储(
meeting_minutes.db
) - 文件化:数据直接写入磁盘文件
- SQL支持:通过标准SQL语言交互
- 异步支持:使用
aiosqlite
库实现非阻塞操作
所有数据库交互由backend/app/db.py
中的DatabaseManager
类管理,负责连接SQLite文件并执行SQL指令。
数据库结构(表设计)
数据库通过表组织数据,主要表结构如下:
表名 | 存储内容 | 关键字段 |
---|---|---|
transcript_chunks | 原始转录文本及处理配置 | meeting_id , transcript_text , model |
summary_processes | 处理任务状态与结果 | meeting_id , status , result (JSON) |
settings | 应用配置 | provider , model , API密钥字段 |
另有meetings
和transcripts
表用于历史数据存储,但核心流程聚焦前两表。
用例:处理数据的保存与更新
结合第1章和第4章流程,数据库交互如下:
- 前端发送转录:POST请求至
/process-transcript
- API网关接收:创建处理记录并分配
process_id
- 保存原始数据:存储文本及处理参数
- 后台处理:调用处理逻辑与AI模型
- 更新任务状态:完成时更新状态并存储结果
数据库交互流程示意图:
关键方法:
创建处理记录
class DatabaseManager:async def create_process(self, meeting_id: str) -> str:"""创建新处理记录"""async with self._get_connection() as conn:# 尝试更新现有记录await conn.execute("""UPDATE summary_processes SET status = ?, updated_at = ? WHERE meeting_id = ?""", ("PENDING", now, meeting_id))# 无记录则新建if conn.total_changes == 0:await conn.execute("""INSERT INTO summary_processes VALUES (?, ?, ?, ?)""",(meeting_id, "PENDING", now, now))await conn.commit()return meeting_id
保存转录数据
async def save_transcript(self, meeting_id: str, text: str):"""保存转录文本"""async with self._get_connection() as conn:await conn.execute("""INSERT INTO transcript_chunks VALUES (?, ?)""", (meeting_id, text))await conn.commit()
更新处理状态
async def update_process(self, meeting_id: str, status: str, result: dict):"""更新处理结果"""async with self._get_connection() as conn:await conn.execute("""UPDATE summary_processes SET status = ?, result = ? WHERE meeting_id = ?""",(status, json.dumps(result), meeting_id))await conn.commit()
用例:摘要数据检索
前端通过/get-summary/{process_id}
获取结果:
- 前端发送
GET请求
- API网关调用
数据库查询
- 返回状态及JSON格式结果
数据检索流程:
数据检索方法:
async def get_transcript_data(self, meeting_id: str):"""获取转录数据及处理结果"""async with self._get_connection() as conn:async with conn.execute("""SELECT * FROM summary_processes WHERE meeting_id = ?""", (meeting_id,)) as cursor:row = await cursor.fetchone()return dict(row) if row else None
定义了一个异步函数,用于从数据库中查询特定会议的转录数据和处理结果。
解析
函数名为get_transcript_data
,接收一个会议ID作为参数。使用async with
语句获取数据库连接,并自动管理连接的关闭。
在数据库连接建立后,执行SQL查询语句,查找summary_processes
表中与给定会议ID匹配的记录。
- 如果找到记录,将数据库行转换为字典格式返回;
- 如果没有找到记录,则返回
None
。
关键
- 异步操作:使用
async/await
语法处理数据库IO操作 - 上下文管理:自动处理数据库连接的开启和关闭
- 结果处理:单行查询结果转换为字典格式
- 空值处理:明确处理查询无结果的情况
配置存储
settings
表存储AI模型配置及API密钥:
async def save_api_key(self, provider: str, key: str):"""保存API密钥"""async with self._get_connection() as conn:await conn.execute(f"""UPDATE settings SET {provider}_api_key = ? WHERE id = 1""", (key,))await conn.commit()
数据库管理优势
- 持久化:数据永久保存
- 集中管理:统一存储关键信息
- 结构化:规范化的表结构设计
- 高效查询:通过SQL快速检索
- 数据完整性:维护数据间关联
- 抽象层:封装底层SQL细节
小结
本章探讨了meeting-minutes
项目的数据库管理系统。我们了解到:
- SQLite提供的轻量级持久化存储方案
DatabaseManager
类对数据库操作的核心封装- 关键业务流程中的
数据存储
与检索机制
- 配置信息的标准化管理
数据库作为系统的记忆中枢,确保了从转录处理到摘要生成的全流程数据完整性。
接下来,我们将探索转录文本的源头——第7章:Whisper转录服务器如何将音频转换为文本输入。
第7章:Whisper转录服务器