当前位置: 首页 > article >正文

openpi π₀ 项目部署运行逻辑(三)——策略推理服务器 serve_policy.py

 π₀ 主控脚本都在 scripts 中:

其中,serve_policy.py 是 openpi 中的策略推理服务端脚本,作用为:启动一个 WebSocket 服务器,加载预训练策略模型,等待外部请求(如来自 main.py 的控制程序),然后执行动作推理并返回结果

一句话总结一下:

将一个 Pi0 策略模型部署为网络服务(WebSocket API),供机器人主控进程远程调用

目录

1 库引用

2 参数定义

3 加载策略模型

4 启动推理服务

5 使用方法总结

5.1 使用方法

5.2 问题分析


1 库引用

import dataclasses  # 用于创建结构化参数对象
import enum          # 用于定义环境枚举类型
import logging       # 用于日志输出
import socket        # 获取本机 IP 地址信息import tyro          # 命令行参数解析库,比 argparse 更现代# 引入策略和策略配置模块
from openpi.policies import policy as _policy
from openpi.policies import policy_config as _policy_config
# 引入 WebSocket 推理服务模块
from openpi.serving import websocket_policy_server
# 引入训练配置模块
from openpi.training import config as _config

2 参数定义

# 定义支持的机器人环境枚举类型
class EnvMode(enum.Enum):ALOHA = "aloha"ALOHA_SIM = "aloha_sim"DROID = "droid"LIBERO = "libero"# 定义用于加载 checkpoint 的参数结构
@dataclasses.dataclass
class Checkpoint:config: str  # 模型对应的训练配置名,如 "pi0_aloha_sim"dir: str     # checkpoint 目录路径(可以是本地或 S3)# 定义默认策略类型占位符结构
@dataclasses.dataclass
class Default:pass  # 使用默认模型配置(例如从 DEFAULT_CHECKPOINT 中选择)# 定义主入口参数结构体
@dataclasses.dataclass
class Args:env: EnvMode = EnvMode.ALOHA_SIM         # 使用的机器人环境default_prompt: str | None = None        # 默认语言提示词port: int = 8000                         # WebSocket 服务端口record: bool = False                     # 是否记录策略输出(用于调试)policy: Checkpoint | Default = dataclasses.field(default_factory=Default)# 定义每个环境对应的默认 checkpoint
DEFAULT_CHECKPOINT: dict[EnvMode, Checkpoint] = {EnvMode.ALOHA: Checkpoint(config="pi0_aloha",dir="s3://openpi-assets/checkpoints/pi0_base",),EnvMode.ALOHA_SIM: Checkpoint(config="pi0_aloha_sim",dir="s3://openpi-assets/checkpoints/pi0_aloha_sim",),EnvMode.DROID: Checkpoint(config="pi0_fast_droid",dir="s3://openpi-assets/checkpoints/pi0_fast_droid",),EnvMode.LIBERO: Checkpoint(config="pi0_fast_libero",dir="s3://openpi-assets/checkpoints/pi0_fast_libero",),
}

其中,class Args 使用 Tyro 来从命令行解析参数,包括以下选项:

  • --env:选择环境,如 ALOHA, DROID(仅在使用默认模型时起作用)
  • --default_prompt:语言提示词(自然语言)
  • --port:监听的端口(默认 8000)
  • --record:是否开启动作记录(用于调试)
  • --policy:加载指定 checkpoint,支持:1. Checkpoint(config="...", dir="...")  2. Default() 使用内置 checkpoint

3 加载策略模型

# 从默认环境创建策略模型
def create_default_policy(env: EnvMode, *, default_prompt: str | None = None) -> _policy.Policy:if checkpoint := DEFAULT_CHECKPOINT.get(env):return _policy_config.create_trained_policy(_config.get_config(checkpoint.config),  # 加载配置checkpoint.dir,                         # 加载参数default_prompt=default_prompt           # 设置默认语言提示)raise ValueError(f"Unsupported environment mode: {env}")# 从命令行参数创建策略模型
def create_policy(args: Args) -> _policy.Policy:match args.policy:case Checkpoint():  # 如果显式指定 checkpointreturn _policy_config.create_trained_policy(_config.get_config(args.policy.config),args.policy.dir,default_prompt=args.default_prompt)case Default():     # 否则使用默认模型return create_default_policy(args.env, default_prompt=args.default_prompt)

分两种情况:

1. 手动指定模型与路径,即增加参数 --policy:

--policy 'Checkpoint(config="pi0_aloha", dir="s3://openpi-assets/checkpoints/pi0_base")'

2. 使用内置默认策略(会从 S3 下载),即不指定参数 --policy:

uv run scripts/serve_policy.py --env ALOHA --default_prompt='fold the towel'

4 启动推理服务

# 主逻辑函数
def main(args: Args) -> None:policy = create_policy(args)            # 加载策略对象policy_metadata = policy.metadata       # 获取策略的元信息if args.record:                         # 如果启用记录,包裹为记录器policy = _policy.PolicyRecorder(policy, "policy_records")# 获取本机 IP 和主机名(日志打印)hostname = socket.gethostname()local_ip = socket.gethostbyname(hostname)logging.info("Creating server (host: %s, ip: %s)", hostname, local_ip)# 创建 WebSocket 推理服务对象server = websocket_policy_server.WebsocketPolicyServer(policy=policy,host="0.0.0.0",               # 监听所有接口port=args.port,               # 指定端口metadata=policy_metadata,    # 附加元信息)server.serve_forever()           # 启动阻塞服务循环# 脚本入口:解析 CLI 参数并运行主函数
if __name__ == "__main__":logging.basicConfig(level=logging.INFO, force=True)  # 初始化日志main(tyro.cli(Args))  # 使用 tyro 从命令行解析 Args 并运行

启动一个 WebSocket 服务端,监听端口,等待来自主控系统(如 main.py)的动作请求,完成如下功能:

  • 接收来自客户端的 observation(图像、语言提示等)
  • 执行 policy.infer(...) 
  • 返回预测的动作结果

5 使用方法总结

5.1 使用方法

✅ 最简单用法(使用默认模型):

uv run scripts/serve_policy.py --env ALOHA --default_prompt='fold the towel'

自动从 S3 下载 pi0_aloha 的默认模型,并运行推理服务器,端口0.0.0.0:8000

✅ 使用本地模型:

uv run scripts/serve_policy.py policy:checkpoint \--policy.config=pi0_aloha \--policy.dir=/home/yejiangchen/.cache/openpi/openpi-assets/checkpoints/pi0_aloha_towel

✅ 启用推理行为记录:

uv run scripts/serve_policy.py --record ...

将推理行为记录下来,便于调试

如果在使用本地模型时出现 bug,运行以下命令查看:

uv run scripts/serve_policy.py policy:checkpoint --help

运行成功可以看到:

可以看到日志:

  • 模型已经被正常加载(包括 JAX/Flax 环境、参数、norm stats)
  • 推理服务已启动,在 0.0.0.0:8000 监听 WebSocket 请求
  • 各种 INFO 日志都是正常的底层库启动信息(比如尝试加载 rocm、tpu,最后还是用的 CUDA/CPU,不影响使用)
  • 检查点参数与归一化参数(norm stats)都已找到并成功加载

5.2 问题分析

可以看到 GPU 启动问题,检查 JAX 是否用上 GPU,输入:

(pi0) yejiangchen@yejiangchen-System-Product-Name:~/Desktop/Codes/openpi-main$ python
Python 3.11.11 (main, Dec 11 2024, 16:28:39) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import jax
>>> print(jax.devices())
[CudaDevice(id=0)]

可以看到:jax.devices() 输出为 [CudaDevice(id=0)],这说明 JAX 在当前环境下已经可以正确识别并调用 GPU(CUDA)。 CUDA 驱动、jax、jaxlib 安装都没有问题

其实只要 jax.devices() 显示 CudaDevice,模型实际计算就会在 GPU 上执行,即便启动日志没高亮显示 gpu 关键字也没关系

JAX/Flax 框架有时不会像 PyTorch 那样在日志里明确输出 “Using CUDA”。它只会在代码第一次涉及到数组/张量运算时,将数据移动到 GPU。如果已经能看到 [CudaDevice(id=0)],说明后续所有计算都会尽量在 GPU 上进行

运行时使用 nvtop 查看 GPU 占用:

安装:

sudo apt install nvtop

使用:

nvtop

可以看到 GPU 基本拉满了

http://www.lryc.cn/news/2392021.html

相关文章:

  • WEB3—— 简易NFT铸造平台(ERC-721)-入门项目推荐
  • 基于vue框架的独居老人上门护理小程序的设计r322q(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。
  • Android 15 控制亮屏灭屏接口实现
  • 【前端】Hexo一键生成目录插件推荐_放入Hexo博客
  • 每日一题——提取服务器物料型号并统计出现次数
  • 宫格导航--纯血鸿蒙组件库AUI
  • RNN 循环神经网络:原理与应用
  • React---day2
  • 若依框架 账户管理 用户分配界面解读
  • 文档贡献 | 技术文档贡献流程及注意事项(保姆级教程)
  • open-vscode-server +nodejs 安装
  • 知行之桥如何将消息推送到钉钉群?
  • 09《从依赖管理到容器化部署:Maven 全链路实战笔记,解锁 Java 项目自动化构建的终极奥秘》
  • <el-date-picker>组件传参时,选中时间和传参偏差8小时
  • ST MCU CAN模块--TTCAN模式浅析
  • MySQL数据库零基础入门教程:从安装配置到数据查询全掌握【MySQL系列】
  • 动态规划(7):背包问题
  • 谷歌浏览器Google Chrome v137.0.7151.41 中文版本版+插件 v1.11.1
  • 《深入解析UART协议及其硬件实现》-- 第三篇:UART ASIC实现优化与低功耗设计
  • Hadoop常用端口号和配置文件
  • Apache Paimon:存储结构、写入及其源码分析
  • 19、Python字符串高阶实战:转义字符深度解析、高效拼接与输入处理技巧
  • 国芯思辰| 同步降压转换器CN2020应用于智能电视,替换LMR33620
  • 6个月Python学习计划 Day 8 - Python 函数基础
  • DeepSeek 提示词大全
  • 俄罗斯无人机自主任务规划!UAV-CodeAgents:基于多智能体ReAct和视觉语言推理的可扩展无人机任务规划
  • 结构性设计模式之Bridge(桥接)
  • CSS篇-1
  • Android 16系统源码_无障碍辅助(一)认识无障碍服务
  • 分布式数据库备份实践