当前位置: 首页 > news >正文

CyberRT-共享内存实现

CyberRT共享内存类图

共享内存消息发布

在这里插入图片描述
数据用共享内存发布时,首先会创建ShmTransmitter对象,包含两个主要成员segment和notifier,Segment用于创建共享内存(上面绿色部分),Notifer 最终构建ReadableInfo通知给其他进程。
使用哪个ConditionNotifier-> notify或MulticastNotifier->notify,是在创建时根据配置文件决定的。
ConditionNotifier 在构建时会创建Indicator对象保存到共享内存中。
调ConditionNotifier-> notify,实际时将ReadableInfo保存到Indicator对象。

ConditionNotifier 共享内存数据接收

在这里插入图片描述
在接收数据时,也会创建同样的共享内存。如果共享内存存在,则直接打开。
在接收端也有同样的共享内存操作ConditionNotifier 。
ShmDispatcher会持有多个通道segment,用std::unordered_map<channelid, segment>表示。
同时启动一个后台线程ThreadFunc 线程轮询处理消息回调。

void ShmDispatcher::ThreadFunc() {ReadableInfo readable_info;// 轮询处理while (!is_shutdown_.load()) {// 100ms, Listen会转换100000 ms,对比seq,如果不等处理消息。每次轮询会等待递减50ms。if (!notifier_->Listen(100, &readable_info)) {ADEBUG << "listen failed.";continue;}if (readable_info.host_id() != host_id_) {ADEBUG << "shm readable info from other host.";continue;}//从共享内存Indicator中读出的数据uint64_t channel_id = readable_info.channel_id();uint32_t block_index = readable_info.block_index();{ReadLockGuard<AtomicRWLock> lock(segments_lock_);if (segments_.count(channel_id) == 0) {continue;}// check block index// std::unordered_map<uint64_t, uint32_t> previous_indexes_; // 保存key: channelID, value: block_indexif (previous_indexes_.count(channel_id) == 0) {previous_indexes_[channel_id] = UINT32_MAX;}uint32_t& previous_index = previous_indexes_[channel_id];if (block_index != 0 && previous_index != UINT32_MAX) {if (block_index == previous_index) {ADEBUG << "Receive SAME index " << block_index << " of channel "<< channel_id;} else if (block_index < previous_index) {ADEBUG << "Receive PREVIOUS message. last: " << previous_index<< ", now: " << block_index;} else if (block_index - previous_index > 1) {ADEBUG << "Receive JUMP message. last: " << previous_index<< ", now: " << block_index;}}previous_index = block_index;ReadMessage(channel_id, block_index);}}
}

MulticastNotifier共享内存数据接收

MulticastNotifier时采用多播socket实现的,默认

std::string mcast_ip("239.255.0.100");
uint16_t mcast_port = 8888;

创建两个socket notify_fd_ 用于发生消息,listen_addr用于接收消息。
在这里插入图片描述
在发送端调用Notify时,时调的MulticastNotifier::Nofify(const ReadableInfo& info)

bool MulticastNotifier::Notify(const ReadableInfo& info) {if (is_shutdown_.load()) {return false;}std::string info_str;info.SerializeTo(&info_str);ssize_t nbytes =sendto(notify_fd_, info_str.c_str(), info_str.size(), 0,(struct sockaddr*)&notify_addr_, sizeof(notify_addr_));return nbytes > 0;
}

接收端用同样的方式轮询

bool MulticastNotifier::Listen(int timeout_ms, ReadableInfo* info) {if (is_shutdown_.load()) {return false;}if (info == nullptr) {AERROR << "info nullptr.";return false;}struct pollfd fds;fds.fd = listen_fd_;fds.events = POLLIN;int ready_num = poll(&fds, 1, timeout_ms);if (ready_num > 0) {char buf[32] = {0};  // larger than ReadableInfo::kSizessize_t nbytes = recvfrom(listen_fd_, buf, 32, 0, nullptr, nullptr);if (nbytes == -1) {AERROR << "fail to recvfrom, " << strerror(errno);return false;}return info->DeserializeFrom(buf, nbytes);} else if (ready_num == 0) {ADEBUG << "timeout, no readableinfo.";} else {if (errno == EINTR) {AINFO << "poll was interrupted.";} else {AERROR << "fail to poll, " << strerror(errno);}}return false;
}
bool Block::TryLockForWrite() {int32_t rw_lock_free = kRWLockFree;//lock_num_ == rw_lock_free, kWriteExclusive赋值给lock_num_,返回true//lock_num_ != rw_lock_free, lock_num_赋值给rw_lock_free,返回falseif (!lock_num_.compare_exchange_weak(rw_lock_free, kWriteExclusive,std::memory_order_acq_rel,std::memory_order_relaxed)) {ADEBUG << "lock num: " << lock_num_.load();return false;}return true;
}

总结
1、CyberRT的共享内存读写都时需要加锁的。
2、每次写数据可以是不连续的block
3、每次当Block.lock_num_= 0:空闲,>0:有读操作, -1 : 写操作。
效率不是高。

http://www.lryc.cn/news/240326.html

相关文章:

  • linux通过串口传输文件
  • uniapp 打包后各静态资源加载失败的问题(背景图,字体等)
  • 关于git hooks
  • mongodb数据库的常用操作语句
  • ubuntu安装完qt后发现找不到图标
  • bazel远程构建(Remote Execution) -- Buildfarm部署中的问题
  • 论文阅读:MedSegDiff: Medical Image Segmentation with Diffusion Probabilistic Model
  • openssl加解密-干货分享
  • 【考研数据结构代码题7】求一元多项式之和
  • python避坑指南(更新中)
  • 可以远程控制电脑桌面的软件有哪些?
  • 洛谷 P1250 种树
  • java大视频在线预览(支持断点下载)
  • OpenCV入门10——特征点检测与匹配
  • 教育机构拒绝“数据陷阱”,群硕将英孚新一代教学管理系统搬上桌
  • 小辰的智慧树(差分+前缀和)
  • Windows如何使用key登录Linux服务器
  • k8s无法删除pv,pvc问题
  • 基于框架的线性回归
  • 万宾科技智能井盖传感器使用方式,具有什么效果?
  • 13.什么是Spring beans?
  • 算法通关村第十二关|白银|字符串经典基础面试题
  • Spring框架学习 -- 读取和存储Bean对象
  • APM工具skywalking部署
  • MFC打开可执行文件exe
  • css实现原生form表单label必填选项红色*样式,以及js控制必填校验
  • 10_6 input输入子系统,流程解析
  • 竞赛选题 题目:垃圾邮件(短信)分类 算法实现 机器学习 深度学习 开题
  • Web前端—移动Web第三天(移动Web基础、rem、less、综合案例—极速问诊)
  • MySQL--慢查询(一)