当前位置: 首页 > news >正文

ROS2 笔记汇总(3) 动作

动作(Action):对“一个完整行为过程”的通信机制

在 ROS 系统中,“动作”是继话题(Topic)和服务(Service)之后的第三种核心通信机制,主要用于处理耗时较长、可被中断或需要进度反馈的任务。

通信模型:以“机器人转圈”为例

我们希望让机器人执行一个耗时、连续、可观察进度、可随时终止的任务:

指令:转圈(旋转360度)
过程:缓慢旋转,每次转一点
场景:机器人不在身边,你需要实时了解状态

 一般通信方式的局限

通信方式能力局限
话题(Topic)可持续广播状态(如当前位置)不能发出具体指令;你收不到执行结果
服务(Service)一问一答,能发指令并收到最终结果无法获取中间状态,也不能取消

所以我们需要更复杂的通信方式:动作(Action)通信模型

动作通信模型分三步:

步骤角色说明
发送目标(Goal)客户端 → 服务端告诉机器人:“请开始转圈”
反馈过程(Feedback)服务端 → 客户端每隔0.1秒报告一下“我已经转了x度了”
最终结果(Result)服务端 → 客户端转完360度后返回:“转圈成功!”或失败

中断任务(Cancel)

在 Action 中,客户端还能在中途取消任务,比如:

“机器人别转了,我不想让你转圈了。”

服务端会收到取消请求,并干净地中断任务,反馈结果为“被取消”。

为什么用动作(Action)来做“转圈+反馈进度”的任务?

需求是否满足
任务需要执行一段时间✔️
需要定期反馈执行进度✔️
需要通知最终执行结果✔️
有可能要取消动作✔️

而这四个能力——只有 动作通信(Action)机制 全都具备。

客户端/服务器模型

动作和服务类似,使用的也是客户端和服务器模型,客户端发送动作的目标,想让机器人干什么,服务器端执行动作过程, 控制机器人达到运动的目标,同时周期反馈动作执行过程中的状态。
客户端发送一个运动的目标,想让机器人动起来,服务器端收到之后,就开始控制机器人运动,一边运动,一边反馈当前的状态,如果是一个导航动作,这个反馈可能是当前所处的坐标,如果是机械臂抓取,这个反馈可能又是机械臂的实时姿态。当运动执行结束后,服务器再反馈一个动作结束的信息。整个通信过程就此结束。

综上所述,我们对此总结一下。

动作通信的完整流程(功能理解)

“客户端发送目标 → 服务器控制运动 → 周期性反馈状态 → 最后发送结果”

这是动作通信最关键的四个阶段:

发送目标
客户端通过发送一个运动命令(例如移动到某个点),请求机器人开始执行一个任务。

任务执行与反馈
服务端开始执行动作,并以固定频率通过反馈消息(例如位置、角度、夹爪状态)告诉客户端当前任务进度。

任务结束与结果
一旦任务完成(或被取消),服务端发送结果,告诉客户端执行是否成功、附带信息等。

通信结束
当前动作的通信流程终止,客户端可发起下一个任务。

支持一对多通信(多个客户端)

你的描述:

“动作通信中的客户端可以有多个…服务器端只能有一个”

这是非常重要的架构特性:

ROS2 中,多个客户端 可以同时发请求

但服务端 一次只能处理一个动作目标(Goal),其他请求会被拒绝或排队。

举例:

多个任务管理模块都想控制机器人运动(多个客户端)

但机器人本体只有一个移动执行模块(一个服务端)

因此,动作机制天然适合“多人指令 + 单体设备”的架构设计

动作是同步通信机制

你还提到:

“既然有反馈,那动作也是一种同步通信机制”

这是对的,动作不同于话题(异步,发了就不管了),动作:

客户端发了请求后,会 等待反馈和最终响应

甚至可以 中途取消,所以是典型的同步通信模式

这也使得动作特别适合:

机器人运动

机械臂抓取

导航/转圈/多段任务 等

ROS2 三大通信机制:话题(Topic)、服务(Service)、动作(Action) 的对比表格,便于你快速理解它们之间的区别与适用场景

特性/机制话题(Topic)服务(Service)动作(Action)
通信模型发布/订阅(Pub/Sub)客户端/服务器(Client/Server)客户端/服务器 + 反馈(Client/Server + Feedback)
通信方式异步:发了就不管是否收到同步:发送请求等待应答同步:发请求并获得反馈+结果,支持取消
数据流向单向(Publisher → Subscriber)双向(Client ⇄ Server)双向(Client ⇄ Server)+ 单向反馈(Server → Client)
通信频率可持续不断发送(如传感器数据)一次请求,一次应答一次请求,多次反馈 + 最终应答
是否有反馈有(只有一次)有(周期反馈 + 最终结果)
是否可取消不可取消不可取消可取消动作执行
典型用途实时传感器数据、运动指令(周期性)参数配置、查询一次性信息复杂任务控制,如导航、机械臂运动、进度可控的行为
接口定义.msg 文件.srv 文件.action 文件
客户端数量多个订阅者多个客户端多个客户端,但服务端同一时刻处理一个动作
服务端数量一个或多个发布者通常一个服务器通常一个动作服务器

在你的自定义包中创建一个动作文件:
路径:your_package/action/Fibonacci.action

int32 order
---
int32[] sequence
---
int32[] partial_sequence
 

order 是客户端发送的目标(要求生成多少项的斐波那契数列)。

partial_sequence 是服务器每次的进度反馈。

sequence 是服务器最终返回的完整序列。

然后在 CMakeLists.txt 中添加:

find_package(action_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)

rosidl_generate_interfaces(${PROJECT_NAME}
  "action/Fibonacci.action"
  DEPENDENCIES builtin_interfaces
)
 

package.xml 中添加:

<build_depend>action_msgs</build_depend>
<exec_depend>action_msgs</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
 

import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from your_package.action import Fibonacci

class FibonacciActionServer(Node):
    def __init__(self):
        super().__init__('fibonacci_server')
        self._action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback)

    async def execute_callback(self, goal_handle):
        self.get_logger().info(f'Executing goal: order={goal_handle.request.order}')
        feedback_msg = Fibonacci.Feedback()
        feedback_msg.partial_sequence = [0, 1]

        for i in range(2, goal_handle.request.order):
            feedback_msg.partial_sequence.append(
                feedback_msg.partial_sequence[i-1] + feedback_msg.partial_sequence[i-2])
            self.get_logger().info(f'Feedback: {feedback_msg.partial_sequence}')
            goal_handle.publish_feedback(feedback_msg)
            await rclpy.sleep(1)

        goal_handle.succeed()
        result = Fibonacci.Result()
        result.sequence = feedback_msg.partial_sequence
        return result

def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()
 

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from your_package.action import Fibonacci

class FibonacciActionClient(Node):
    def __init__(self):
        super().__init__('fibonacci_client')
        self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

    def send_goal(self, order=10):
        goal_msg = Fibonacci.Goal()
        goal_msg.order = order

        self._action_client.wait_for_server()
        self._send_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback)
        self._send_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return

        self.get_logger().info('Goal accepted')
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Feedback: {feedback.partial_sequence}')

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info(f'Result: {result.sequence}')
        rclpy.shutdown()

def main(args=None):
    rclpy.init(args=args)
    node = FibonacciActionClient()
    node.send_goal(order=10)
    rclpy.spin(node)
 

对于上述代码的解析如下所示

引入 ROS2 动作接口和你自定义的动作接口 Fibonacci.action

from rclpy.action import ActionServer
from your_package.action import Fibonacci

初始化节点并创建动作服务器:

self._action_server = ActionServer(
    self,
    Fibonacci,              # 接口类型
    'fibonacci',            # 动作名称(客户端也会连这个名字)
    self.execute_callback   # 执行函数(收到目标后的主处理流程)
)

execute_callback 异步函数逻辑:

goal_handle.request.order:客户端请求的“数列长度”。

feedback_msg.partial_sequence:用于持续反馈每一步计算结果。

goal_handle.publish_feedback(feedback_msg):每计算一项就发布一次反馈。

goal_handle.succeed():告诉客户端任务成功。

return result:返回最终结果。

async def execute_callback(self, goal_handle):

接着就是客户端内容如下所示:

创建连接到名为 "fibonacci" 的动作服务器的客户端。

Fibonacci 是通信接口,需匹配 .action 定义。

self._action_client = ActionClient(self, Fibonacci, 'fibonacci')

异步发送请求,传入 order 值(比如 10)。

feedback_callback:每次接收到服务器反馈都会执行的函数。

goal_msg.order = order
self._send_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

检查目标是否被服务器接受。

如果接受了,就注册 get_result_callback() 来等结果。

def goal_response_callback(self, future):
    goal_handle = future.result()

当服务器返回最终结果时,打印出完整的斐波那契数列。

调用 rclpy.shutdown() 结束客户端运行。

def get_result_callback(self, future):
    result = future.result().result
 

http://www.lryc.cn/news/572528.html

相关文章:

  • Linux树莓派项目实战:外网访问、PWM呼吸灯、超声波测距与驱动开发
  • 《思维力:高效的系统思维》
  • 【开源模型】高考数学139分!小米MiMo开源模型:7B参数突出重围
  • MySQL 的 WITH ROLLUP 功能
  • MySQL: Invalid use of group function
  • swing综合案例.
  • Github 热点项目 [特殊字符]PHP性能革命!FrankenPHP让Laravel/Symfony飞起来!
  • (哈希)128. 最长连续序列
  • 5G核心网周期性注册更新机制:信令流程与字段解析
  • Python 数据分析与可视化 Day 1 - Pandas 数据分析基础入门
  • 算法导论第十九章 并行算法:解锁计算新维度
  • 防火墙的禁用及开启
  • Stable Diffusion 实战-手机壁纸制作 第二篇:优化那些“崩脸”和“马赛克”问题,让图像更加完美!
  • ROS学习之动作通信
  • C#建立与数据库连接(版本问题的解决方案)踩坑总结
  • E结构体基础.go
  • 67、数据访问-crud实验-分页数据展示
  • ROS学习之服务通信
  • flink的多种部署模式
  • 【LLM学习笔记3】搭建基于chatgpt的问答系统(下)
  • 面试-操作系统
  • 【Dify 沙箱网络问题排查与解决】
  • 工程师生活:清除电热水壶(锅)水垢方法
  • 3DTiles三维模型
  • Python-循环结构解析
  • windows 11 安装和配置 WSL #1 Ubuntu
  • 30.all和any
  • 动态规划:01 背包(闫氏DP分析法)
  • python脚本间的相互调用
  • 磐基PaaS平台MongoDB组件SSPL许可证风险与合规性分析(上)