当前位置: 首页 > news >正文

LCM中间件入门(2):LCM核心实现原理解析

文章目录

        • 一、`good()`函数:LCM实例状态检查的实现原理
          • 1. 实现逻辑
          • 2. 简化代码示例(C语言核心逻辑)
        • 二、`publish()`:向指定channel发送消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
        • 三、`subscribe()`:接收指定channel消息的原理
          • 1. 完整流程拆解
          • 2. 简化代码示例(C++核心逻辑)
      • 四、整体协同机制总结

LCM的核心功能基于C语言实现(C++接口为其封装),其底层通过 UDP网络通信消息序列化回调管理实现发布-订阅模式。以下从代码原理层面解析关键函数的工作机制。

一、good()函数:LCM实例状态检查的实现原理

good()函数用于判断LCM实例是否初始化成功,其核心是检查LCM内部关键资源的有效性。

1. 实现逻辑

LCM实例(lcm_t结构体)的核心成员包括:

  • 网络套接字(socket_fd):用于收发数据的UDP套接字;
  • 线程状态(thread_running):接收消息的后台线程是否启动;
  • 错误码(error):记录初始化或运行中的错误状态。

good()函数的本质是检查这些成员是否处于“可用状态”:

  • 套接字是否成功创建(socket_fd != -1);
  • 后台线程是否正常运行(针对需要异步接收的模式);
  • 无致命错误(error == 0)。
2. 简化代码示例(C语言核心逻辑)
// LCM实例结构体(简化)
typedef struct {int socket_fd;          // UDP套接字描述符int thread_running;     // 接收线程状态(1=运行,0=停止)int error;              // 错误码(0=无错误)// 其他成员:回调表、组播地址等
} lcm_t;// good()函数实现
int lcm_good(lcm_t *lcm) {return (lcm != NULL && lcm->socket_fd != -1 && lcm->thread_running && lcm->error == 0);
}

在C++接口中,lcm::LCM::good()是对上述C函数的封装,返回bool类型。

二、publish():向指定channel发送消息的原理

publish()的核心是将消息序列化为字节流,并通过UDP组播发送到与channel关联的网络地址,同时在数据包中嵌入channel标识。

1. 完整流程拆解
  1. 消息序列化
    根据.lcm文件生成的编解码函数(如example_temperature_t_pack()),将消息结构体转换为二进制字节流(解决跨平台数据格式差异)。

  2. channel与网络地址映射
    LCM默认将channel名称映射为UDP组播地址(239.255.x.y,其中x.y由channel名称的哈希值计算得出),确保同一channel的消息仅被订阅该channel的节点接收。

  3. 数据包封装
    构造LCM协议数据包,格式为:

    [4字节魔数] + [4字节消息长度] + [channel名称] + [序列化的消息数据]
    

    魔数(0x4C434D00,即"LCM\0")用于接收方识别LCM数据包。

  4. UDP发送
    通过LCM实例的套接字将数据包发送到channel对应的组播地址。

2. 简化代码示例(C++核心逻辑)
// C++ publish()接口
void LCM::publish(const std::string& channel, const void* data, size_t len) {if (!good()) return;  // 检查实例状态// 1. 计算channel对应的组播地址(基于哈希)struct sockaddr_in addr;lcm_channel_to_multicast(channel.c_str(), &addr);  // 内部哈希映射// 2. 封装LCM协议头uint8_t header[8];header[0] = 0x4C; header[1] = 0x43; header[2] = 0x4D; header[3] = 0x00;  // 魔数*(uint32_t*)(header + 4) = htonl(len);  // 消息长度(网络字节序)// 3. 拼接完整数据包:头 + channel + 消息数据std::vector<uint8_t> packet;packet.insert(packet.end(), header, header + 8);packet.insert(packet.end(), channel.begin(), channel.end());packet.push_back('\0');  // channel以空字符结尾packet.insert(packet.end(), (uint8_t*)data, (uint8_t*)data + len);// 4. 通过UDP发送到组播地址sendto(lcm->socket_fd, packet.data(), packet.size(), 0,(struct sockaddr*)&addr, sizeof(addr));
}
三、subscribe():接收指定channel消息的原理

subscribe()的核心是注册回调函数并与channel绑定,后台线程接收数据包后,根据channel查找对应的回调并触发执行。

1. 完整流程拆解
  1. 回调函数注册
    订阅时,LCM将channel名称消息类型回调函数存储在内部的回调表(哈希表,channel -> 回调列表)中。

  2. 后台接收线程
    LCM初始化时启动一个后台线程,循环从套接字读取UDP数据包:

    • 解析数据包,验证魔数和格式;
    • 提取channel名称和序列化的消息数据。
  3. 消息路由与反序列化
    根据解析出的channel名称,在回调表中查找对应的回调函数:

    • 若找到,调用自动生成的反序列化函数(如example_temperature_t_unpack()),将字节流转换为消息结构体;
    • 调用注册的回调函数,传入消息结构体。
  4. 线程安全处理
    回调函数的执行在后台线程中进行,若需在多线程环境中使用,需用户自行添加同步机制(如互斥锁)。

2. 简化代码示例(C++核心逻辑)
// 回调表结构(简化):channel -> 回调函数列表
typedef struct {std::unordered_map<std::string, std::vector<Callback>> callbacks;std::mutex mutex;  // 保护回调表的线程安全
} CallbackTable;// 订阅函数实现
void LCM::subscribe(const std::string& channel, void (*callback)(const ReceiveBuffer*, const std::string&, void*),void* userdata) {std::lock_guard<std::mutex> lock(callback_table.mutex);// 将回调函数注册到channel对应的列表中callback_table.callbacks[channel].emplace_back(callback, userdata);
}// 后台接收线程逻辑
void receive_thread(lcm_t* lcm) {while (lcm->thread_running) {uint8_t buffer[65536];  // UDP最大包长struct sockaddr_in sender;socklen_t sender_len = sizeof(sender);ssize_t n = recvfrom(lcm->socket_fd, buffer, sizeof(buffer), 0,(struct sockaddr*)&sender, &sender_len);if (n <= 0) continue;// 解析数据包:检查魔数、提取channel和消息数据if (buffer[0] != 0x4C || buffer[1] != 0x43 || buffer[2] != 0x4D || buffer[3] != 0x00)continue;  // 非LCM数据包,忽略uint32_t msg_len = ntohl(*(uint32_t*)(buffer + 4));std::string channel = (char*)(buffer + 8);  // channel以空字符结尾const uint8_t* msg_data = buffer + 8 + channel.size() + 1;// 查找回调并执行std::lock_guard<std::mutex> lock(callback_table.mutex);auto it = callback_table.callbacks.find(channel);if (it != callback_table.callbacks.end()) {for (auto& cb : it->second) {// 构造接收缓冲区,调用回调ReceiveBuffer rbuf{msg_data, msg_len};cb.function(&rbuf, channel, cb.userdata);}}}
}

四、整体协同机制总结

工作原理交互图如下:

PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化错误码为0调用good()返回状态 (socket有效+线程运行+无错误)初始化LCM实例创建lcm_t结构体1. 创建UDP套接字2. 启动接收线程3. 初始化回调表(空)调用good()返回状态 (socket有效+线程运行+无错误)订阅通道THERMOMETER调用subscribe("THERMOMETER", 回调函数)1. 加锁保护回调表2. 将回调函数注册到"THERMOMETER"条目下订阅完成发布消息到通道构造Temperature消息(温度值、时间戳等)调用publish("THERMOMETER", 消息)publish()内部处理1. 消息序列化(调用自动生成的pack函数)2. 计算channel哈希映射到组播地址(239.255.x.y)3. 封装LCM数据包(魔数+长度+channel+序列化数据)通过UDP发送数据包到组播地址接收UDP数据包(含"THERMOMETER"标识)接收线程处理1. 验证魔数和数据包格式2. 提取channel("THERMOMETER")和序列化数据3. 消息反序列化(调用自动生成的unpack函数)4. 查找回调表中"THERMOMETER"对应的回调函数触发回调函数(传入反序列化后的消息)执行回调逻辑(打印温度数据等)PublisherLCM_Pub (发布者实例)Network (UDP组播)LCM_Sub (订阅者实例)Subscriber
  1. 初始化阶段lcm_t实例创建套接字、启动接收线程,`good(
  2. )`验证这些资源是否就绪。
  3. 发布阶段publish()将消息序列化,通过channel映射的组播地址发送UDP包,嵌入channel标识。
  4. 订阅阶段subscribe()将回调注册到channel对应的哈希表;接收线程解析UDP包,根据channel查找回调,反序列化消息后触发执行。

这种设计实现了无中心节点的轻量化通信,通过UDP组播和哈希映射保证低延迟,适用于实时系统中基于channel的高效数据交互。

http://www.lryc.cn/news/605644.html

相关文章:

  • 《人工智能导论》(python版)第2章 python基础2.2编程基础
  • [算法]Leetcode3487
  • Video_1920×1080i 1920_1080p
  • 大白话解释---FreeRTOS中的队列集
  • 基于知识驱动的解释性条件扩散模型用于无对比剂心肌梗死增强合成|文献速递-医学影像算法文献分享
  • CSS和XPATH选择器对比
  • 《Java 程序设计》第 15 章 - 事件处理与常用控件
  • Vibe Coding:AI驱动开发的安全暗礁与防护体系
  • 异步I/O和同步I/O
  • Day15--二叉树--222. 完全二叉树的节点个数,110. 平衡二叉树,257. 二叉树的所有路径,404. 左叶子之和
  • 在Linux中创建LVGL应用
  • Kotlin -> 普通Lambda vs 挂起Lambda
  • 【Django】-1- 开发项目搭建
  • Django模型迁移指南:从命令用法到最佳实践
  • HttpServletRequest详细解释
  • 如何在NPM上发布自己的React组件(包)
  • 人工智能概念之十一:常见的激活函数与参数初始化
  • Cesium 快速入门(四)相机控制完全指南
  • langchain--1--prompt、output格式、LCEL示例
  • 【烧脑算法】Dijkstra 算法:解决最短路问题
  • 会议室预定系统核心技术:如何用一行SQL解决时间冲突检测难题
  • LLC电源原边MOS管DS增加RC吸收对ZVS的影响分析
  • ode with me是idea中用来干嘛的插件
  • 山东移动云主机:技术架构与特性解析
  • AI 安监系统:为工业园安全保驾护航
  • 1 机器学习概述 (第一天2025.7.31)
  • RabbitMQ 队列配置设置 RabbitMQ 消息监听器的并发消费者数量java
  • Java 大视界 -- Java 大数据在智能医疗远程健康监测与疾病预防预警中的应用(374)
  • Linux 进程管理与计划任务
  • linux git ssh配置过程