当前位置: 首页 > news >正文

如何测试模型推理性能:从零开始的Python指南

如何测试模型推理性能:从零开始的Python指南

    • 什么是模型推理性能?
    • 测试模型推理性能的步骤
      • 1. 监测内存使用情况
      • 2. 测试模型吞吐量
    • 运行测试
    • 总结

在机器学习和深度学习中,模型的推理性能是一个非常重要的指标。它可以帮助我们了解模型在实际应用中的表现,尤其是在处理大规模数据时。本文将带你一步步了解如何测试模型的推理性能,并使用Python编写简单的代码来实现这一目标。

什么是模型推理性能?

模型推理性能主要关注两个方面:

  1. 内存使用情况:模型在推理过程中占用的内存大小。
  2. 模型吞吐量:模型在单位时间内能够处理的token数量。吞吐量又分为两个阶段:
    • Prefill阶段:模型预先计算并缓存一部分自注意力计算的过程。
    • Decode阶段:模型在自回归阶段不断生成新token的过程。

测试模型推理性能的步骤

我们将使用Python编写两个脚本:monitor.pybenchmark.pymonitor.py 用于监测内存使用情况,benchmark.py 用于测试模型的吞吐量。

1. 监测内存使用情况

首先,我们需要编写一个脚本来监测模型推理时的内存使用情况。我们将使用 psutil 库来实现这一功能。

import psutil
import time
import os
import signal
import sys
import atexit
import jsoncurrent_pid = os.getpid()
print(f"当前进程pid号为: {current_pid}")monitored = sys.argv[1]
print(f"需要监测的进程为: {monitored}")
target_pid = None# 清除残留进程
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):if monitored in proc.info['cmdline'] and proc.pid != current_pid:print(f"残留进程为: {proc.pid}")proc.kill()print("残留进程清除完成")print("\r\n")
print("寻找指定进程,请打开吞吐量测试脚本,并在吞吐量测试完毕后关闭此脚本:")# 寻找目标进程
while not target_pid:for proc in psutil.process_iter(['pid', 'name', 'cmdline']):if monitored in proc.info['cmdline'] and proc.pid != current_pid:target_pid = proc.pidbreakif not target_pid:print("未找到指定进程")time.sleep(1)print("\r\n")
print(f"已找到指定进程pid号为: {target_pid}")
process = psutil.Process(target_pid)
process_cmdline = process.cmdline()
print(f"pid对应名称为 {process_cmdline}")max_memory_stats = {"max_rss": 0,"max_vms": 0
}# 导出内存使用情况到JSON文件
def write_memory_max_to_json():with open('memory_results.json', 'w') as report_file:json.dump(max_memory_stats, report_file, indent=2)print("\n已成功将内存使用情况保存至 'memory_results.json' 文件中.")atexit.register(write_memory_max_to_json)  # 在进程结束时保存结果# 实时监测内存使用情况
while 1:time.sleep(0.1)mem_info = process.memory_info()rss = mem_info.rss / 1024 ** 2vms = mem_info.vms / 1024 ** 2max_memory_stats["max_rss"] = max(max_memory_stats["max_rss"], rss)max_memory_stats["max_vms"] = max(max_memory_stats["max_vms"], vms)print(f"Current RSS Memory: {rss:.2f} MB | Current VMS Memory: {vms:.2f} MB")print(f"Max RSS Memory: {max_memory_stats['max_rss']:.2f} MB | Max VMS Memory: {max_memory_stats['max_vms']:.2f} MB\n")

2. 测试模型吞吐量

接下来,我们编写一个脚本来测试模型的吞吐量。我们将使用 transformers 库来加载模型并进行推理。

import torch
import time
from transformers import AutoModelForCausalLM, AutoTokenizer
import json
import osos.environ["CUDA_VISIBLE_DEVICES"] = "-1"  # 使用CPU进行推理# 加载分词器和模型
print("加载分词器\r\n")
tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-0.5B", trust_remote_code=True)
print("加载模型\r\n")
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-0.5B", device_map="auto", trust_remote_code=True).eval()
print("模型加载完成\r\n")# 加载指定文本作为prompt
print("加载prompt\r\n")
with open('prompt.txt', 'r', encoding='utf-8') as file:prompt = file.read()# 对prompt进行分词
print("分词器分词\r\n")
inputs = tokenizer(prompt, return_tensors='pt')
inputs = inputs.to(model.device)# Prefill阶段吞吐量测试
print("prefill阶段吞吐量测试:\r\n")
batch_size = 1  # 单个prompt,批次大小为1
total_prompts = 10  # 测试10次
total_tokens = inputs['input_ids'].shape[1]  # token数量start_time_prefill = time.time()
for _ in range(total_prompts):with torch.no_grad():  # 关闭梯度计算以提高推理性能outputs = model(**inputs)
end_time_prefill = time.time()
elapsed_time_prefill = end_time_prefill - start_time_prefill  # 推理总时长
throughput_prefill = total_prompts * total_tokens / elapsed_time_prefill  # prefill吞吐量,每秒处理的token数
print(f"tokens总数为 {total_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_prefill}")
print(f"模型prefill阶段的吞吐量: {throughput_prefill:.2f} tokens/s\r\n")
print("prefill阶段吞吐量测试完成\r\n")# Decode阶段吞吐量测试
print("decode阶段吞吐量测试:\r\n")
max_new_tokens = 50  # 要推理的新token总数
total_prompts = 10  # 测试10次start_time_decode = time.time()
for _ in range(total_prompts):with torch.no_grad():  # 关闭梯度计算以提高推理性能outputs = model.generate(**inputs, min_new_tokens=max_new_tokens, max_new_tokens=max_new_tokens)print(f"请确保生成new_tokens为50 {total_tokens} --> {outputs.shape}")
end_time_decode = time.time()
elapsed_time_decode = end_time_decode - start_time_decode  # 推理总时长
throughput_decode = total_prompts * max_new_tokens / elapsed_time_decode  # decode吞吐量,每秒生成的新token数
print(f"生成新tokens数为 {max_new_tokens}")
print(f"测试次数为 {total_prompts}")
print(f"总时长为 {elapsed_time_decode}")
print(f"模型decode的吞吐量: {throughput_decode:.2f} tokens/s\r\n")
print("decode阶段吞吐量测试完成\r\n")# 保存吞吐量测试结果
results = {"prefill_throughput": throughput_prefill,"decode_throughput": throughput_decode
}
with open('throughput_results.json', 'w') as output_file:json.dump(results, output_file, indent=4)
print("\n已成功将吞吐量结果保存至 'throughput_results.json' 文件中.")

运行测试

  1. 首先,在命令行中运行 monitor.py 脚本,并指定要监测的脚本为 benchmark.py

    python monitor.py benchmark.py
    
  2. 接着,打开一个新的终端窗口,运行 benchmark.py 脚本:

    python benchmark.py
    
  3. 等待 benchmark.py 运行结束后,monitor.py 会自动结束,并将内存使用情况和吞吐量测试结果分别保存到 memory_results.jsonthroughput_results.json 文件中。

总结

通过以上步骤,你可以轻松地测试模型的推理性能。无论是内存使用情况还是模型吞吐量,这些指标都能帮助你更好地了解模型的实际表现。希望这篇博客能帮助你入门模型性能测试,并为你未来的项目提供有价值的参考。

http://www.lryc.cn/news/511580.html

相关文章:

  • 我们来学activiti -- bpmn
  • 【每日学点鸿蒙知识】节点析构问题、区分手机和pad、 Navigation路由问题、Tabs组件宽度、如何监听Map
  • 敏捷测试文化的转变
  • 如何配置线程池参数,才能创建性能最好、最稳定的Spring异步线程池?
  • 【时间之外】IT人求职和创业应知【80】-特殊日子
  • Vue中接入萤石等直播视频(更新中ing)
  • 如何学习、使用Ai,才能跟上时代的步伐?
  • RabbitMQ中的异步Confirm模式:提升消息可靠性的利器
  • Linux(Centos 7.6)目录结构详解
  • upload-labs关卡记录8
  • GXUOJ-算法-第二次作业
  • Gavin Wood 的 Polkadot 2024 年度回顾:技术突破与未来的无限可能
  • AduSkin、WPF-UI、Prism:WPF 框架全解析与应用指南
  • 【超详细】Git的基本概念和基本使用方式
  • 【数据结构】单链表的使用
  • 外键约束的应用层维护
  • springboot整合log4j2日志框架1
  • 06 - Django 视图view
  • 基于云计算的资源管理系统
  • 从0入门自主空中机器人-3-【环境与常用软件安装】
  • electron node-api addon开发
  • 如何在嵌入式系统或计算机系统中验证boot程序
  • scala基础学习_运算符
  • 【ANGULAR网站开发】初始环境搭建
  • 【Java】面试题 并发安全 (2)
  • springboot启动不了 因一个spring-boot-starter-web底下的tomcat-embed-core依赖丢失
  • React 组件的通信方式
  • WAV文件双轨PCM格式详细说明及C语言解析示例
  • 【ES6复习笔记】数值扩展(16)
  • 百度热力图数据日期如何选择