当前位置: 首页 > news >正文

【LangFuse】数据集与测试

1. 在线标注

在这里插入图片描述

2. 上传已有数据集

import json# 调整数据格式 {"input":{...},"expected_output":"label"}
data = []
with open('my_annotations.jsonl', 'r', encoding='utf-8') as fp:for line in fp:example = json.loads(line.strip())item = {"input": {"outlines": example["outlines"],"user_input": example["user_input"]},"expected_output": example["label"]}data.append(item)
#%%
from langfuse import Langfuse
from langfuse.model import CreateDatasetRequest, CreateDatasetItemRequest
from tqdm import tqdm
import langfusedataset_name = "my-dataset"# 初始化客户端
langfuse=Langfuse()# 创建数据集,如果已存在不会重复创建
try:langfuse.create_dataset(name=dataset_name,# optional descriptiondescription="My first dataset",# optional metadatametadata={"author": "wzr","type": "demo"})
except:pass# 考虑演示运行速度,只上传前50条数据
for item in tqdm(data[:50]):langfuse.create_dataset_item(dataset_name="my-dataset",input=item["input"],expected_output=item["expected_output"])

tqdm

100%|██████████| 50/50 [00:11<00:00, 4.48it/s]

3. 定义评估函数

def simple_evaluation(output, expected_output):return output == expected_output

4.运行测试

Prompt 模板与 Chain(LCEL)

 import ChatOpenAI
from langchain_core.output_parsers import StrOutputParserneed_answer = PromptTemplate.from_template("""
*********
你是老师的助教,你的工作是从学员的课堂交流中选择出需要老师回答的问题,加以整理以交给老师回答。课程内容:
{outlines}
*********
学员输入:
{user_input}
*********
如果这是一个需要老师答疑的问题,回复Y,否则回复N。
只回复Y或N,不要回复其他内容。""")model = ChatOpenAI(temperature=0, seed=42)
parser = StrOutputParser()chain_v1 = (need_answer| model| parser

在LangFuse数据集测试效果:

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()
lock = threading.Lock()def run_evaluation(chain, dataset_name, run_name):dataset = langfuse.get_dataset(dataset_name)def process_item(item):with lock:# 注意:多线程调用此处要加锁,否则会有id冲突导致丢数据handler = item.get_langchain_handler(run_name=run_name)# Assuming chain.invoke is a synchronous functionoutput = chain.invoke(item.input, config={"callbacks": [handler]})# Assuming handler.root_span.score is a synchronous functionhandler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output))print('.', end='', flush=True)# for item in dataset.items:#    process_item(item)with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
run_evaluation(chain_v1, "my-dataset", "v1-"+datetime.now().strftime("%d/%m/%Y %H:%M:%S"))# 保证全部数据同步到云端
langfuse_context.flush()

tqdm Python进度条

tqdm 是一个快速,可扩展的Python进度条,可以在 Python 长循环中添加一个进度提示信息,用户只需要封装任意的 iterable 在 tqdm(iterable) 函数下即可。

在你的代码中:

from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

你首先从 tqdm 模块导入了 tqdm 函数,然后在 with 语句中创建了一个 tqdm 对象。total=len(dataset.items) 设置了进度条的总长度,即数据集项目的总数。

for 循环中,executor.map(process_item, dataset.items) 是一个并行执行任务的操作,它会对 dataset.items 中的每一个元素调用 process_item 函数。每次任务完成时,pbar.update(1) 会更新进度条,表示有一个任务已经完成。

这样,当你的程序运行时,你会在控制台上看到一个进度条,它会显示已经完成的任务数量和总任务数量,以及进度条的百分比和估计的剩余时间。这对于长时间运行的任务非常有用,因为它可以让用户知道任务的进度和完成时间。

ThreadPoolExecutor

这段代码使用了Python的concurrent.futures.ThreadPoolExecutor类来创建一个线程池,并使用该线程池并行地执行任务。下面是对这段代码的详细解释:

from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  1. ThreadPoolExecutor(max_workers=4): 这里创建了一个ThreadPoolExecutor实例,它管理一个包含4个线程的线程池。max_workers参数定义了线程池中线程的最大数量。

  2. with … as executor: 使用with语句来管理线程池的生命周期。当with语句块开始时,线程池被创建,当with语句块结束时,线程池会被关闭。使用with语句可以确保线程池在不再需要时被正确关闭,释放资源。

  3. executor.map(process_item, dataset.items): map方法是ThreadPoolExecutor的一个方法,它接受一个函数和一个可迭代对象作为参数。在这个例子中,process_item函数会被应用到dataset.items中的每一个元素上。map方法会返回一个结果的迭代器,这些结果与输入的可迭代对象的元素一一对应。

这段代码的作用是创建一个包含4个线程的线程池,然后并行地对dataset.items中的每一个元素调用process_item函数。这是一种并行处理任务的方式,可以提高程序的效率,特别是在处理I/O密集型任务(如网络请求或文件读写)时。

这段代码实现了一个基于Langfuse的并行评估系统,主要用于对LangChain流程进行批量测试和监控。以下是对代码的逐层解析:


代码结构概览

from concurrent.futures import ThreadPoolExecutor
import threading
from langfuse import Langfuse
from datetime import datetimelangfuse = Langfuse()  # 初始化Langfuse客户端
lock = threading.Lock()  # 全局线程锁def run_evaluation(chain, dataset_name, run_name):# 核心评估逻辑# ...

核心功能模块分解

1. 数据加载
dataset = langfuse.get_dataset(dataset_name)
  • 作用:从Langfuse平台加载预定义的测试数据集。
  • 数据格式:假设数据集包含多个items,每个item包含:
    {"input": "用户输入内容",  # 测试输入"expected_output": "期望输出"  # 预期结果
    }
    

2. 多线程任务分发
with ThreadPoolExecutor(max_workers=4) as executor:executor.map(process_item, dataset.items)
  • 技术选型:使用线程池(4线程)并行处理数据集。
  • 性能优势:适合处理I/O密集型任务(如网络请求到OpenAI接口)。
  • 注意事项:需确保chain.invoke和Langfuse操作是线程安全的。

3. 线程安全处理
def process_item(item):with lock:  # 加锁防止Handler ID冲突handler = item.get_langchain_handler(run_name=run_name)
  • 关键问题
    • Langfuse的Handler生成依赖上下文ID,多线程环境下可能产生ID冲突。
    • 不加锁会导致监控数据丢失或错乱。
  • 解决方式:通过全局锁(threading.Lock)序列化Handler的创建过程。

4. 监控埋点与评估
output = chain.invoke(item.input, config={"callbacks": [handler]})handler.trace.score(name="accuracy",value=simple_evaluation(output, item.expected_output)
)
  • 埋点层次
    1. 全链路追踪:通过handler自动记录LangChain每一步的执行过程。
    2. 自定义评分:手动添加准确率(accuracy)评分。
  • 评估逻辑simple_evaluation函数(需自定义)对比输出与预期结果,例如:
    def simple_evaluation(output, expected):return 1 if output.strip() == expected.strip() else 0
    

监控数据示例

在Langfuse后台会生成如下结构的数据:

字段示例值说明
TraceLangChain Evaluation评估任务根节点
├─ Spanprompt_processingLangChain内部步骤
├─ Generationgpt-3.5-turboLLM调用记录
└─ Scoreaccuracy: 0.8自定义评分

关键设计模式

1. 生产者-消费者模式
  • 生产者:线程池分发数据项。
  • 消费者process_item函数处理单个数据项。
2. 回调机制

通过config={"callbacks": [handler]}将Langfuse监控注入LangChain执行过程。

3. 线程安全隔离
  • 全局锁:确保Handler创建原子性。
  • 线程局部存储:Langfuse SDK内部应使用线程局部变量存储上下文。

潜在优化点

1. 错误处理增强
try:output = chain.invoke(...)
except Exception as e:handler.trace.error(str(e))  # 记录异常
2. 动态线程数调整
max_workers = os.cpu_count() * 2  # 根据CPU核心数动态设置
3. 进度可视化
from tqdm import tqdmwith tqdm(total=len(dataset.items)) as pbar:for _ in executor.map(process_item, dataset.items):pbar.update(1)

适用场景

  1. 批量模型测试:对比不同模型版本的效果。
  2. 回归测试:确保代码更新不破坏核心功能。
  3. 异常诊断:通过Trace分析失败案例的详细执行路径。

通过这种设计,开发者可以高效地验证LangChain应用的质量,并在生产环境中持续监控性能表现。

http://www.lryc.cn/news/546930.html

相关文章:

  • 【Python】如何解决Jupyter Notebook修改外部模块后必须重启内核的问题?
  • Redis 篇
  • React + TypeScript 实战指南:用类型守护你的组件
  • 从零开始:Linux环境下如何制作静态库与动态库
  • 【智能体Agent】ReAct智能体的实现思路和关键技术
  • Java进阶:Zookeeper相关笔记
  • QT-绘画事件
  • 鸿蒙NEXT开发-端云一体化开发
  • 大模型——股票分析AI工具开发教程
  • nexus 实现https 私有镜像搭建
  • 颈椎X光数据集(cervical spine X-ray dataset)
  • (动态规划 完全背包 零钱兑换)leetcode 322
  • 【AI大模型】DeepSeek + Kimi 高效制作PPT实战详解
  • Pytorch的一小步,昇腾芯片的一大步
  • rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流
  • 【HarmonyOS Next】自定义Tabs
  • Sass 模块化革命:深入解析 @use 语法,打造高效 CSS 架构
  • 【渗透测试】反弹 Shell 技术详解(一)
  • python:pymunk + pygame 模拟六边形中小球弹跳运动
  • Windows 图形显示驱动开发-WDDM 3.2-本机 GPU 围栏对象(二)
  • 23种设计模式之《模板方法模式(Template Method)》在c#中的应用及理解
  • DEV-C++ 为什么不能调试?(正确解决方案)
  • 【C++设计模式】第五篇:原型模式(Prototype)
  • 深入 Vue.js 组件开发:从基础到实践
  • maven导入spring框架
  • 数据守护者:备份文件的重要性与自动化实践策略
  • MyBatis @Param 注解详解:指定的参数找不到?
  • 【项目日记(八)】内存回收与联调
  • 性能测试监控工具jmeter+grafana
  • 016.3月夏令营:数理类