ROS2 笔记汇总(3) 动作
动作(Action):对“一个完整行为过程”的通信机制
在 ROS 系统中,“动作”是继话题(Topic)和服务(Service)之后的第三种核心通信机制,主要用于处理耗时较长、可被中断或需要进度反馈的任务。
通信模型:以“机器人转圈”为例
我们希望让机器人执行一个耗时、连续、可观察进度、可随时终止的任务:
指令:转圈(旋转360度)
过程:缓慢旋转,每次转一点
场景:机器人不在身边,你需要实时了解状态
一般通信方式的局限
通信方式 | 能力 | 局限 |
---|---|---|
话题(Topic) | 可持续广播状态(如当前位置) | 不能发出具体指令;你收不到执行结果 |
服务(Service) | 一问一答,能发指令并收到最终结果 | 无法获取中间状态,也不能取消 |
所以我们需要更复杂的通信方式:动作(Action)通信模型
动作通信模型分三步:
步骤 | 角色 | 说明 |
---|---|---|
发送目标(Goal) | 客户端 → 服务端 | 告诉机器人:“请开始转圈” |
反馈过程(Feedback) | 服务端 → 客户端 | 每隔0.1秒报告一下“我已经转了x度了” |
最终结果(Result) | 服务端 → 客户端 | 转完360度后返回:“转圈成功!”或失败 |
中断任务(Cancel)
在 Action 中,客户端还能在中途取消任务,比如:
“机器人别转了,我不想让你转圈了。”
服务端会收到取消请求,并干净地中断任务,反馈结果为“被取消”。
为什么用动作(Action)来做“转圈+反馈进度”的任务?
需求 | 是否满足 |
---|---|
任务需要执行一段时间 | ✔️ |
需要定期反馈执行进度 | ✔️ |
需要通知最终执行结果 | ✔️ |
有可能要取消动作 | ✔️ |
而这四个能力——只有 动作通信(Action)机制 全都具备。
客户端/服务器模型
动作和服务类似,使用的也是客户端和服务器模型,客户端发送动作的目标,想让机器人干什么,服务器端执行动作过程, 控制机器人达到运动的目标,同时周期反馈动作执行过程中的状态。
客户端发送一个运动的目标,想让机器人动起来,服务器端收到之后,就开始控制机器人运动,一边运动,一边反馈当前的状态,如果是一个导航动作,这个反馈可能是当前所处的坐标,如果是机械臂抓取,这个反馈可能又是机械臂的实时姿态。当运动执行结束后,服务器再反馈一个动作结束的信息。整个通信过程就此结束。
综上所述,我们对此总结一下。
动作通信的完整流程(功能理解)
“客户端发送目标 → 服务器控制运动 → 周期性反馈状态 → 最后发送结果”
这是动作通信最关键的四个阶段:
发送目标
客户端通过发送一个运动命令(例如移动到某个点),请求机器人开始执行一个任务。
任务执行与反馈
服务端开始执行动作,并以固定频率通过反馈消息(例如位置、角度、夹爪状态)告诉客户端当前任务进度。
任务结束与结果
一旦任务完成(或被取消),服务端发送结果,告诉客户端执行是否成功、附带信息等。
通信结束
当前动作的通信流程终止,客户端可发起下一个任务。
支持一对多通信(多个客户端)
你的描述:
“动作通信中的客户端可以有多个…服务器端只能有一个”
这是非常重要的架构特性:
ROS2 中,多个客户端 可以同时发请求。
但服务端 一次只能处理一个动作目标(Goal),其他请求会被拒绝或排队。
举例:
多个任务管理模块都想控制机器人运动(多个客户端)
但机器人本体只有一个移动执行模块(一个服务端)
因此,动作机制天然适合“多人指令 + 单体设备”的架构设计。
动作是同步通信机制
你还提到:
“既然有反馈,那动作也是一种同步通信机制”
这是对的,动作不同于话题(异步,发了就不管了),动作:
客户端发了请求后,会 等待反馈和最终响应
甚至可以 中途取消,所以是典型的同步通信模式
这也使得动作特别适合:
机器人运动
机械臂抓取
导航/转圈/多段任务 等
ROS2 三大通信机制:话题(Topic)、服务(Service)、动作(Action) 的对比表格,便于你快速理解它们之间的区别与适用场景:
特性/机制 | 话题(Topic) | 服务(Service) | 动作(Action) |
---|---|---|---|
通信模型 | 发布/订阅(Pub/Sub) | 客户端/服务器(Client/Server) | 客户端/服务器 + 反馈(Client/Server + Feedback) |
通信方式 | 异步:发了就不管是否收到 | 同步:发送请求等待应答 | 同步:发请求并获得反馈+结果,支持取消 |
数据流向 | 单向(Publisher → Subscriber) | 双向(Client ⇄ Server) | 双向(Client ⇄ Server)+ 单向反馈(Server → Client) |
通信频率 | 可持续不断发送(如传感器数据) | 一次请求,一次应答 | 一次请求,多次反馈 + 最终应答 |
是否有反馈 | 无 | 有(只有一次) | 有(周期反馈 + 最终结果) |
是否可取消 | 不可取消 | 不可取消 | 可取消动作执行 |
典型用途 | 实时传感器数据、运动指令(周期性) | 参数配置、查询一次性信息 | 复杂任务控制,如导航、机械臂运动、进度可控的行为 |
接口定义 | .msg 文件 | .srv 文件 | .action 文件 |
客户端数量 | 多个订阅者 | 多个客户端 | 多个客户端,但服务端同一时刻处理一个动作 |
服务端数量 | 一个或多个发布者 | 通常一个服务器 | 通常一个动作服务器 |
在你的自定义包中创建一个动作文件:
路径:your_package/action/Fibonacci.action
int32 order
---
int32[] sequence
---
int32[] partial_sequence
order
是客户端发送的目标(要求生成多少项的斐波那契数列)。
partial_sequence
是服务器每次的进度反馈。
sequence
是服务器最终返回的完整序列。
然后在 CMakeLists.txt
中添加:
find_package(action_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/Fibonacci.action"
DEPENDENCIES builtin_interfaces
)
在 package.xml
中添加:
<build_depend>action_msgs</build_depend>
<exec_depend>action_msgs</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from your_package.action import Fibonacci
class FibonacciActionServer(Node):
def __init__(self):
super().__init__('fibonacci_server')
self._action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback)
async def execute_callback(self, goal_handle):
self.get_logger().info(f'Executing goal: order={goal_handle.request.order}')
feedback_msg = Fibonacci.Feedback()
feedback_msg.partial_sequence = [0, 1]
for i in range(2, goal_handle.request.order):
feedback_msg.partial_sequence.append(
feedback_msg.partial_sequence[i-1] + feedback_msg.partial_sequence[i-2])
self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
goal_handle.publish_feedback(feedback_msg)
await rclpy.sleep(1)
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = feedback_msg.partial_sequence
return result
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from your_package.action import Fibonacci
class FibonacciActionClient(Node):
def __init__(self):
super().__init__('fibonacci_client')
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
def send_goal(self, order=10):
goal_msg = Fibonacci.Goal()
goal_msg.order = order
self._action_client.wait_for_server()
self._send_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback)
self._send_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info(f'Feedback: {feedback.partial_sequence}')
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(f'Result: {result.sequence}')
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
node = FibonacciActionClient()
node.send_goal(order=10)
rclpy.spin(node)
对于上述代码的解析如下所示
引入 ROS2 动作接口和你自定义的动作接口 Fibonacci.action
。
from rclpy.action import ActionServer
from your_package.action import Fibonacci
初始化节点并创建动作服务器:
self._action_server = ActionServer(
self,
Fibonacci, # 接口类型
'fibonacci', # 动作名称(客户端也会连这个名字)
self.execute_callback # 执行函数(收到目标后的主处理流程)
)
execute_callback
异步函数逻辑:
goal_handle.request.order
:客户端请求的“数列长度”。
feedback_msg.partial_sequence
:用于持续反馈每一步计算结果。
goal_handle.publish_feedback(feedback_msg)
:每计算一项就发布一次反馈。
goal_handle.succeed()
:告诉客户端任务成功。
return result
:返回最终结果。
async def execute_callback(self, goal_handle):
接着就是客户端内容如下所示:
创建连接到名为 "fibonacci"
的动作服务器的客户端。
Fibonacci
是通信接口,需匹配 .action
定义。
self._action_client = ActionClient(self, Fibonacci, 'fibonacci')
异步发送请求,传入 order
值(比如 10)。
feedback_callback
:每次接收到服务器反馈都会执行的函数。
goal_msg.order = order
self._send_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
检查目标是否被服务器接受。
如果接受了,就注册 get_result_callback()
来等结果。
def goal_response_callback(self, future):
goal_handle = future.result()
当服务器返回最终结果时,打印出完整的斐波那契数列。
调用 rclpy.shutdown()
结束客户端运行。
def get_result_callback(self, future):
result = future.result().result