【LangFuse】数据集与测试
1. 在线标注
2. 上传已有数据集
import json# 调整数据格式 {"input":{...},"expected_output":"label"}
data = []
with open('my_annotations.jsonl', 'r', encoding='utf-8') as fp:for line in fp:example = json.loads(line.strip())item = {"input": {"outlines": example["outlines"],"user_input": example["user_input"]},"expected_output": example["label"]}data.append(item)
#%%
from langfuse import Langfuse
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest
from tqdm import tqdm
import langfusedataset_name = "my-dataset"# 初始化客户端
langfuse=Langfuse()# 创建数据集,如果已存在不会重复创建
try:langfuse.create_dataset(name=dataset_name,# optional descriptiondescription="My first dataset",# optional metadatametadata={"author": "wzr","type": "demo"})
except:pass# 考虑演示运行速度,只上传前50条数据
for item in tqdm(data[:50]):langfuse.create_dataset_item(dataset_name="my-dataset",input=item["input"],expected_output=item["expected_output"])
tqdm
100%|██████████| 50/50 [00:11<00:00, 4.48it/s]
3. 定义评估函数
def simple_evaluation(output, expected_output):return output == expected_output
4.运行测试
Prompt 模板与 Chain(LCEL)
import ChatOpenAI
from langchain_core.output_parsers import StrOutputParserneed_answer = PromptTemplate.from_template("""
*********
你是老师的助教,你的工作是从学员的课堂交流中选择出需要老师回答的问题,加以整理以交给老师回答。课程内容:
{outlines}
*********
学员输入:
{user_input}
*********
如果这是一个需要老师答疑的问题,回复Y,否则回复N。
只回复Y或N,不要回复其他内容。""")model = ChatOpenAI(temperature=0, seed=42)
parser = StrOutputParser()chain_v1 = (need_answer| model| parser
在LangFuse数据集测试效果:
from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()
lock = threading.Lock()def run_evaluation(chain, dataset_name, run_name):dataset = langfuse.get_dataset(dataset_name)def process_item(item):with lock:# 注意:多线程调用此处要加锁,否则会有id冲突导致丢数据handler = item.get_langchain_handler(run_name=run_name)# Assuming chain.invoke is a synchronous functionoutput = chain.invoke(item.input, config={"callbacks": [handler]})# Assuming handler.root_span.score is a synchronous functionhandler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output))print('.', end='', flush=True)# for item in dataset.items:# process_item(item)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
run_evaluation(chain_v1, "my-dataset", "v1-"+datetime.now().strftime("%d/%m/%Y %H:%M:%S"))# 保证全部数据同步到云端
langfuse_context.flush()
tqdm Python进度条
tqdm
是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的 iterable 在 tqdm(iterable)
函数下即可。
在你的代码中:
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)
你首先从 tqdm
模块导入了 tqdm
函数,然后在 with
语句中创建了一个 tqdm
对象。total=len(dataset.items)
设置了进度条的总长度,即数据集项目的总数。
在 for
循环中,executor.map(process_item, dataset.items)
是一个并行执行任务的操作,它会对 dataset.items
中的每一个元素调用 process_item
函数。每次任务完成时,pbar.update(1)
会更新进度条,表示有一个任务已经完成。
这样,当你的程序运行时,你会在控制台上看到一个进度条,它会显示已经完成的任务数量和总任务数量,以及进度条的百分比和估计的剩余时间。这对于长时间运行的任务非常有用,因为它可以让用户知道任务的进度和完成时间。
ThreadPoolExecutor
这段代码使用了Python的concurrent.futures.ThreadPoolExecutor
类来创建一个线程池,并使用该线程池并行地执行任务。下面是对这段代码的详细解释:
from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
-
ThreadPoolExecutor(max_workers=4): 这里创建了一个
ThreadPoolExecutor
实例,它管理一个包含4个线程的线程池。max_workers
参数定义了线程池中线程的最大数量。 -
with … as executor: 使用
with
语句来管理线程池的生命周期。当with
语句块开始时,线程池被创建,当with
语句块结束时,线程池会被关闭。使用with
语句可以确保线程池在不再需要时被正确关闭,释放资源。 -
executor.map(process_item, dataset.items):
map
方法是ThreadPoolExecutor
的一个方法,它接受一个函数和一个可迭代对象作为参数。在这个例子中,process_item
函数会被应用到dataset.items
中的每一个元素上。map
方法会返回一个结果的迭代器,这些结果与输入的可迭代对象的元素一一对应。
这段代码的作用是创建一个包含4个线程的线程池,然后并行地对dataset.items
中的每一个元素调用process_item
函数。这是一种并行处理任务的方式,可以提高程序的效率,特别是在处理I/O密集型任务(如网络请求或文件读写)时。
这段代码实现了一个基于Langfuse的并行评估系统,主要用于对LangChain流程进行批量测试和监控。以下是对代码的逐层解析:
代码结构概览
from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse() # 初始化Langfuse客户端
lock = threading.Lock() # 全局线程锁def run_evaluation(chain, dataset_name, run_name):# 核心评估逻辑# ...
核心功能模块分解
1. 数据加载
dataset = langfuse.get_dataset(dataset_name)
- 作用:从Langfuse平台加载预定义的测试数据集。
- 数据格式:假设数据集包含多个
items
,每个item
包含:{"input": "用户输入内容", # 测试输入"expected_output": "期望输出" # 预期结果 }
2. 多线程任务分发
with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
- 技术选型:使用线程池(4线程)并行处理数据集。
- 性能优势:适合处理I/O密集型任务(如网络请求到OpenAI接口)。
- 注意事项:需确保
chain.invoke
和Langfuse操作是线程安全的。
3. 线程安全处理
def process_item(item):with lock: # 加锁防止Handler ID冲突handler = item.get_langchain_handler(run_name=run_name)
- 关键问题:
- Langfuse的Handler生成依赖上下文ID,多线程环境下可能产生ID冲突。
- 不加锁会导致监控数据丢失或错乱。
- 解决方式:通过全局锁(
threading.Lock
)序列化Handler的创建过程。
4. 监控埋点与评估
output = chain.invoke(item.input, config={"callbacks": [handler]})handler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output)
)
- 埋点层次:
- 全链路追踪:通过
handler
自动记录LangChain每一步的执行过程。 - 自定义评分:手动添加准确率(accuracy)评分。
- 全链路追踪:通过
- 评估逻辑:
simple_evaluation
函数(需自定义)对比输出与预期结果,例如:def simple_evaluation(output, expected):return 1 if output.strip() == expected.strip() else 0
监控数据示例
在Langfuse后台会生成如下结构的数据:
字段 | 示例值 | 说明 |
---|---|---|
Trace | LangChain Evaluation | 评估任务根节点 |
├─ Span | prompt_processing | LangChain内部步骤 |
├─ Generation | gpt-3.5-turbo | LLM调用记录 |
└─ Score | accuracy: 0.8 | 自定义评分 |
关键设计模式
1. 生产者-消费者模式
- 生产者:线程池分发数据项。
- 消费者:
process_item
函数处理单个数据项。
2. 回调机制
通过config={"callbacks": [handler]}
将Langfuse监控注入LangChain执行过程。
3. 线程安全隔离
- 全局锁:确保Handler创建原子性。
- 线程局部存储:Langfuse SDK内部应使用线程局部变量存储上下文。
潜在优化点
1. 错误处理增强
try:output = chain.invoke(...)
except Exception as e:handler.trace.error(str(e)) # 记录异常
2. 动态线程数调整
max_workers = os.cpu_count() * 2 # 根据CPU核心数动态设置
3. 进度可视化
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)
适用场景
- 批量模型测试:对比不同模型版本的效果。
- 回归测试:确保代码更新不破坏核心功能。
- 异常诊断:通过Trace分析失败案例的详细执行路径。
通过这种设计,开发者可以高效地验证LangChain应用的质量,并在生产环境中持续监控性能表现。