当前位置: 首页 > news >正文

SylixOS 下的消息队列

1、简介

  消息队列(Message Queue) 是 Linux 提供的一种进程间通信(IPC)机制,允许进程通过发送和接收消息块来进行数据交换。与管道、共享内存不同,消息队列具有结构化、非阻塞和优先级控制等特点。

  MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID)来区分。在进行任务间通信时,一个任务将消息加到 MQ 尾端,另一个任务从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了任务间的通信。如下 MQ 的模型
在这里插入图片描述
  消息队列的典型特点如下:

  • 异步通信
    发送方和接收方不需同时存在,消息可以先入队,稍后由接收方读取。
  • 消息有结构
    每条消息可以包含一个类型/优先级标识,便于有选择地接收特定类型的消息。
  • 支持阻塞与非阻塞模式
    接收和发送操作可配置为阻塞或非阻塞,提高通信灵活性。
  • 容量限制与排队机制
    消息队列有最大消息数和每条消息最大长度限制,超出后发送可能阻塞或失败。
  • 跨进程通信(IPC)
    支持多个进程之间通信,可用于分布式或模块化系统中。

2、SylixOS 下的消息队列

  一个 SylixOS 消息队列必须要调用 API_MsgQueueCreate 函数创建之后才能使用,如果创建成功,API_MsgQueueCreate 函数将返回一个消息队列的句柄。

  任务如果需要接收消息,可以调用 API_MsgQueueReceive 函数。发送消息可以调用 API_MsgQueueSend 函数。

  SylixOS 支持消息优先级,让一些更紧急的消息得到及时处理。优先级由 API_MsgQueueSendEx2 函数入参控制。

  当一个消息队列使用完毕后(并确保以后也不再使用),应该调用 API_MsgQueueDelete 函数将其删除,SylixOS 会回收该消息队列占用的内核资源。

使用消息队列接口时,要注意一些场景。例如,调用 API_MsgQueueSend、API_MsgQueueReceive 要考虑,该函数是否会引起睡眠?(这是由函数入参决定的)。以上只是案例,实际使用时要小心。

这里只讲解部分接口,完整的接口以及实现请阅读 SylixOS 内核源码。

2.1 创建消息队列

LW_API  
LW_OBJECT_HANDLE  API_MsgQueueCreate (CPCHAR             pcName,ULONG              ulMaxMsgCounter,size_t             stMaxMsgByteSize,ULONG              ulOption,LW_OBJECT_ID      *pulId)
{....../* 函数返回的 pulId 就是 event ID,用来索引指定事件 */__KERNEL_MODE_PROC(pevent = _Allocate_Event_Object();                              /*  申请事件                    */);if (!pevent) {__KERNEL_MODE_PROC(_Free_MsgQueue_Object(pmsgqueue););_DebugHandle(__ERRORMESSAGE_LEVEL, "there is no ID to build a msgqueue.\r\n");_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);return  (LW_OBJECT_HANDLE_INVALID);}/* 初始化消息缓冲区,消息缓冲区大小是不固定的,根据函数入参来决定 */stMaxMsgByteSizeReal   = ROUND_UP(stMaxMsgByteSize, sizeof(size_t))+ sizeof(LW_CLASS_MSGNODE);                  /*  每条消息缓存大小            */stHeapAllocateByteSize = (size_t)ulMaxMsgCounter* stMaxMsgByteSizeReal;                      /*  需要分配的内存总大小        */pvMemAllocate = __KHEAP_ALLOC(stHeapAllocateByteSize);              /*  申请内存                    */if (!pvMemAllocate) {__KERNEL_MODE_PROC(_Free_MsgQueue_Object(pmsgqueue);_Free_Event_Object(pevent););_DebugHandle(__ERRORMESSAGE_LEVEL, "kernel low memory.\r\n");_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);return  (LW_OBJECT_HANDLE_INVALID);}if (pcName) {                                                       /*  拷贝名字                    */lib_strcpy(pevent->EVENT_cEventName, pcName);} else {pevent->EVENT_cEventName[0] = PX_EOS;                           /*  清空名字                    */}pmsgqueue->MSGQUEUE_pvBuffer   = pvMemAllocate;pmsgqueue->MSGQUEUE_stMaxBytes = stMaxMsgByteSize;/* 这里会将申请到的消息缓冲区,根号每个消息大小,组成一个消息链表 */_MsgQueueClear(pmsgqueue, ulMaxMsgCounter);                         /*  缓存区准备好                */......
}

2.2 发送消息队列

LW_API  
ULONG  API_MsgQueueSend2 (LW_OBJECT_HANDLE  ulId,const PVOID       pvMsgBuffer,size_t            stMsgLen,ULONG             ulTimeout)
{....../* 通过事件 ID 找到对应事件,进而找到对应的消息队列 */pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;if (stMsgLen > pmsgqueue->MSGQUEUE_stMaxBytes) {                    /*  长度太长                    */__KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */_DebugHandle(__ERRORMESSAGE_LEVEL, "ulMsgLen invalidate.\r\n");_ErrorHandle(ERROR_MSGQUEUE_MSG_LEN);return  (ERROR_MSGQUEUE_MSG_LEN);}if (_EventWaitNum(EVENT_MSG_Q_R, pevent)) {                         /*  有任务在等待消息            */BOOL    bSendOk = LW_TRUE;/* 尝试将阻塞在接收该消息的任务由阻塞态,修改为就绪态 */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {         /*  优先级等待队列              */_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_R, ppringList);           /*  激活优先级等待线程          */ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);} else {_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_R, ppringList);               /*  激活FIFO等待线程            */ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);}if ((stMsgLen > ptcb->TCB_stMaxByteSize) && !(ptcb->TCB_ulRecvOption & LW_OPTION_NOERROR)) {            /*  是否允许自动截断            */*ptcb->TCB_pstMsgByteSize = 0;ptcb->TCB_stMaxByteSize = 0;bSendOk = LW_FALSE;} else {stRealLen = (stMsgLen < ptcb->TCB_stMaxByteSize) ?(stMsgLen) : (ptcb->TCB_stMaxByteSize);         /*  确定信息拷贝长短            */*ptcb->TCB_pstMsgByteSize = stRealLen;                      /*  保存长短                    *//* * 注意,这里会将 msg 信息,直接拷贝到线程的 TCB 中 msg 指向的 buffer 中* TCB 中 msg 指针,是在任务尝试接收消息被阻塞时设置的。msg 指针直接指向阻塞任务的接收消息缓冲区* 这样可以省去一次拷贝操作,不需要在把消息拷贝到消息队列中*/lib_memcpy(ptcb->TCB_pvMsgQueueMessage,                     /*  传递消息                    */pvMsgBuffer, stRealLen);}KN_INT_ENABLE(iregInterLevel);                                  /*  使能中断                    *//* 尝试将阻塞在接收该消息的任务,分配 cpu 运行 */_EventReadyHighLevel(ptcb,LW_THREAD_STATUS_MSGQUEUE,LW_SCHED_ACT_INTERRUPT);                   /*  处理 TCB                    */MONITOR_EVT_LONG2(MONITOR_EVENT_ID_MSGQ, MONITOR_EVENT_MSGQ_POST, ulId, ptcb->TCB_ulId, LW_NULL);/* 这里会引起调度,尝试调度阻塞在接收该消息的任务 */__KERNEL_EXIT();                                                /*  退出内核                    */if (bSendOk == LW_FALSE) {goto    __re_send;                                          /*  重新发送                    */}return  (ERROR_NONE);} else {if (pevent->EVENT_ulCounter < pevent->EVENT_ulMaxCounter) {     /*  检查是否还有空间加          */pevent->EVENT_ulCounter++;/* 这里会将消息放到消息队列缓冲区 */_MsgQueuePut(pmsgqueue, pvMsgBuffer, stMsgLen, EVENT_MSG_Q_PRIO_LOW);                         /*  保存消息                    */__KERNEL_EXIT_IRQ(iregInterLevel);                          /*  退出内核                    */return  (ERROR_NONE);} else {/* 下面就是处理,如果消息队列已经满了的情况,选择是否需要阻塞,等待消息队列有空闲 */if ((ulTimeout == LW_OPTION_NOT_WAIT) || LW_CPU_GET_CUR_NESTING()) {                             /*  不需要等待                  */__KERNEL_EXIT_IRQ(iregInterLevel);                      /*  退出内核                    */_ErrorHandle(ERROR_MSGQUEUE_FULL);return  (ERROR_MSGQUEUE_FULL);}LW_TCB_GET_CUR(ptcbCur);                                    /*  当前任务控制块              */ptcbCur->TCB_iPendQ         = EVENT_MSG_Q_S;ptcbCur->TCB_usStatus      |= LW_THREAD_STATUS_MSGQUEUE;    /*  写状态位,开始等待          */ptcbCur->TCB_ucWaitTimeout  = LW_WAIT_TIME_CLEAR;           /*  清空等待时间                */if (ulTimeout == LW_OPTION_WAIT_INFINITE) {                 /*  是否是无穷等待              */ptcbCur->TCB_ulDelay = 0ul;} else {ptcbCur->TCB_ulDelay = ulTimeout;                       /*  设置超时时间                */}__KERNEL_TIME_GET_IGNIRQ(ulTimeSave, ULONG);                /*  记录系统时间                */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {_EVENT_INDEX_Q_PRIORITY(ptcbCur->TCB_ucPriority, ucPriorityIndex);_EVENT_PRIORITY_Q_PTR(EVENT_MSG_Q_S, ppringList, ucPriorityIndex);ptcbCur->TCB_ppringPriorityQueue = ppringList;          /*  记录等待队列位置            */_EventWaitPriority(pevent, ppringList);                 /*  加入优先级等待表            */} else {                                                    /*  按 FIFO 等待                */_EVENT_FIFO_Q_PTR(EVENT_MSG_Q_S, ppringList);           /*  确定 FIFO 队列的位置        */_EventWaitFifo(pevent, ppringList);                     /*  加入 FIFO 等待表            */}KN_INT_ENABLE(iregInterLevel);                              /*  使能中断                    */ulEventOption = pevent->EVENT_ulOption;iSchedRet = __KERNEL_EXIT();                                /*  调度器解锁                  */......        }}
}

  SylixOS 消息队列还支持紧急消息的发送,支持消息的优先级,保证了某些异常情况下的安全。函数接口为:API_MsgQueueSendEx2 ,通过入参 ulOption 来控制消息的优先级。

/*********************************************************************************************************消息队列发送选项 (URGENT 与 BROADCAST 不能同时设置)
*********************************************************************************************************/#define LW_OPTION_URGENT                                0x00000001      /*  消息队列紧急消息发送        */
#define LW_OPTION_URGENT_0                              LW_OPTION_URGENT/*  最高紧急优先级              */
#define LW_OPTION_URGENT_1                              0x00000011
#define LW_OPTION_URGENT_2                              0x00000021
#define LW_OPTION_URGENT_3                              0x00000031
#define LW_OPTION_URGENT_4                              0x00000041
#define LW_OPTION_URGENT_5                              0x00000051
#define LW_OPTION_URGENT_6                              0x00000061
#define LW_OPTION_URGENT_7                              0x00000071      /*  最低紧急优先级              */#define LW_OPTION_BROADCAST                             0x00000002      /*  消息队列广播发送            */LW_API ULONG            API_MsgQueueSendEx2(LW_OBJECT_HANDLE  ulId,const PVOID       pvMsgBuffer,size_t            stMsgLen,ULONG             ulTimeout,ULONG             ulOption);/*  带有超时的发送消息高级接口  */

  关于这里的消息优先级,我们讲解一下 _MsgQueuePut 函数。

/*********************************************************************************************************
** 函数名称: _MsgQueuePut
** 功能描述: 向消息队列中写入一个消息
** 输 入  : pmsgqueue     消息队列控制块 
**         : pvMsgBuffer   消息缓冲区
**         : stMsgLen      消息长度
**         : uiPrio        消息优先级
** 输 出  : NONE
** 全局变量: 
** 调用模块: 
*********************************************************************************************************/
VOID  _MsgQueuePut (PLW_CLASS_MSGQUEUE    pmsgqueue,PVOID                 pvMsgBuffer,size_t                stMsgLen, UINT                  uiPrio)
{PLW_CLASS_MSGNODE  pmsgnode;/* 从消息队列中的空闲消息链表上,找到一个空闲消息块 */pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate(&pmsgqueue->MSGQUEUE_pmonoFree);pmsgnode->MSGNODE_stMsgLen = stMsgLen;lib_memcpy((PVOID)(pmsgnode + 1), pvMsgBuffer, stMsgLen);           /*  拷贝消息                    *//* 这里会设置优先级位图,就是为了确保,从消息队列中取数据时,取的一定是优先级最高的消息 */if (pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio] == LW_NULL) {pmsgqueue->MSGQUEUE_uiMap |= (1 << uiPrio);                     /*  设置优先级位图              */}/* 将消息块链接在指定的优先级链表上(尾插) */_list_mono_free_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio], &pmsgqueue->MSGQUEUE_pmonoTail[uiPrio], &pmsgnode->MSGNODE_monoManage);
}

2.3 接收消息队列

/*********************************************************************************************************
** 函数名称: _MsgQueueGet
** 功能描述: 从消息队列中获得一个消息
** 输 入  : pmsgqueue     消息队列控制块
**         : pvMsgBuffer   接收缓冲区
**         : stMaxByteSize 缓冲区大小
**         : pstMsgLen     获得消息的长度
** 输 出  : NONE
** 全局变量: 
** 调用模块: 
*********************************************************************************************************/
VOID  _MsgQueueGet (PLW_CLASS_MSGQUEUE    pmsgqueue,PVOID                 pvMsgBuffer,size_t                stMaxByteSize,size_t               *pstMsgLen)
{INT                 iQ;PLW_CLASS_MSGNODE   pmsgnode;/* 在当前的消息队列中,找到优先级最高的消息块链表 */iQ = archFindLsb(pmsgqueue->MSGQUEUE_uiMap) - 1;                    /*  计算优先级                  */_BugHandle(!pmsgqueue->MSGQUEUE_pmonoHeader[iQ], LW_TRUE, "buffer is empty!\r\n");/* 从消息链表上取下一个消息块,取的顺序是从 header 开始取,即相同优先级,按照先入先出顺序 */pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[iQ],&pmsgqueue->MSGQUEUE_pmonoTail[iQ]);if (pmsgqueue->MSGQUEUE_pmonoHeader[iQ] == LW_NULL) {pmsgqueue->MSGQUEUE_uiMap &= ~(1 << iQ);                        /*  清除优先级位图              */}*pstMsgLen = (stMaxByteSize < pmsgnode->MSGNODE_stMsgLen) ? (stMaxByteSize) : (pmsgnode->MSGNODE_stMsgLen);        /*  确定拷贝信息数量            */lib_memcpy(pvMsgBuffer, (PVOID)(pmsgnode + 1), *pstMsgLen);         /*  拷贝消息                    *//* 将该消息块从消息链表中删除,并回收到消息队列的空闲链表上 */_list_mono_free(&pmsgqueue->MSGQUEUE_pmonoFree, &pmsgnode->MSGNODE_monoManage);
}

  API_MsgQueueReceive 函数中的重点,还是 _MsgQueueGet 函数。

LW_API  
ULONG  API_MsgQueueReceive (LW_OBJECT_HANDLE    ulId,PVOID               pvMsgBuffer,size_t              stMaxByteSize,size_t             *pstMsgLen,ULONG               ulTimeout){......pevent = &_K_eventBuffer[usIndex];iregInterLevel = __KERNEL_ENTER_IRQ();                              /*  进入内核                    */if (_Event_Type_Invalid(usIndex, LW_TYPE_EVENT_MSGQUEUE)) {__KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */_ErrorHandle(ERROR_MSGQUEUE_TYPE);return  (ERROR_MSGQUEUE_TYPE);}/* 通过指定事件 ID,获得消息队列结构 */pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;ptcbCur->TCB_ulRecvOption = LW_OPTION_NOERROR;                      /*  接收大消息自动截断          *//* 如果消息队列中,消息不为空 */if (pevent->EVENT_ulCounter) {                                      /*  事件有效                    */pevent->EVENT_ulCounter--;_MsgQueueGet(pmsgqueue, pvMsgBuffer, stMaxByteSize, pstMsgLen);                         /*  获得消息                    */if (_EventWaitNum(EVENT_MSG_Q_S, pevent)) {                     /*  有任务在等待写消息          */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {     /*  优先级等待队列              */_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_S, ppringList);       /*  激活优先级等待线程          */ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);} else {_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_S, ppringList);           /*  激活FIFO等待线程            */ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);}KN_INT_ENABLE(iregInterLevel);                              /*  使能中断                    */_EventReadyHighLevel(ptcb,LW_THREAD_STATUS_MSGQUEUE,LW_SCHED_ACT_INTERRUPT);               /*  处理 TCB                    */__KERNEL_EXIT();                                            /*  退出内核                    */} else {__KERNEL_EXIT_IRQ(iregInterLevel);                          /*  退出内核                    */}return  (ERROR_NONE);}/* 下面就是判断,如果消息队列为空,是否需要阻塞等待 */if (ulTimeout == LW_OPTION_NOT_WAIT) {                              /*  不等待                      */__KERNEL_EXIT_IRQ(iregInterLevel);                              /*  退出内核                    */_ErrorHandle(ERROR_THREAD_WAIT_TIMEOUT);                        /*  超时                        */return  (ERROR_THREAD_WAIT_TIMEOUT);}ptcbCur->TCB_pstMsgByteSize    = pstMsgLen;ptcbCur->TCB_stMaxByteSize     = stMaxByteSize;/* 如果在 receive 消息时被阻塞,则将 buffer 赋值给 TCB 中的 msg 指针。再被唤醒时,数据已经被拷贝到 buffer 中了 */ptcbCur->TCB_pvMsgQueueMessage = pvMsgBuffer;                       /*  记录信息                    */......
}
http://www.lryc.cn/news/573329.html

相关文章:

  • Jupyter notebook调试:设置断点运行
  • Redis后端的简单了解与使用(项目搭建前置)
  • DeepEP开源MoE模型分布式通信库
  • 洛谷P3953 [NOIP 2017 提高组] 逛公园
  • 【DCS开源项目】—— Lua 如何调用 DLL、DLL 与 DCS World 的交互
  • day44-硬件学习之arm启动代码
  • 【Datawhale组队学习202506】零基础学爬虫 02 数据解析与提取
  • 【simulink】IEEE5节点系统潮流仿真模型(2机5节点全功能基础模型)
  • 【智能体】dify部署本地步骤
  • LeetCode第279题_完全平方数
  • 湖北理元理律师事务所企业债务纾困路径:司法重整中的再生之道
  • 蓝桥杯备赛篇(上) - 参加蓝桥杯所需要的基础能力 1(C++)
  • 华为OD机试_2025 B卷_判断一组不等式是否满足约束并输出最大差(Python,100分)(附详细解题思路)
  • 车载电子电器架构 --- 电子电气架构设计方案
  • QC -io 服务器排查报错方式/报错: Failed to convert string to integer of varId variable!“
  • 2.7 Python方法调用机制解析:从描述符到字节码执行
  • 学习C++、QT---03(C++的输入输出、C++的基本数据类型介绍)
  • 【无标题】使用 Chocolatey 安装 WSL 管理工具 LxRunOffline
  • 贪心算法思路详解
  • Mac电脑-Markdown编辑器-Typora
  • 利用nRF54L15-DK的DEBUG OUT接口调试用户设计的ARM处理器系统
  • springboot口腔管理平台
  • 【分布式理论】读确认数与写确认数:分布式一致性的核心概念
  • WPF Style样式 全局样式资源字典
  • 获取 DOM 与 nextTick:Vue 中的 DOM 操作
  • CTF--PhP Web解题(走入CTF)
  • 增量学习ASAP的源码剖析:如何实现人形的运动追踪和全身控制(核心涉及HumanoidVerse中的agents模块)
  • Redis集群部署终极指南:架构选型、生产部署与深度优化
  • 人形机器人_双足行走动力学:本田机械腿的倒立摆模型
  • rt-thread中使用usb官方自带的驱动问题记录