SylixOS 下的消息队列
1、简介
消息队列(Message Queue) 是 Linux 提供的一种进程间通信(IPC)机制,允许进程通过发送和接收消息块来进行数据交换。与管道、共享内存不同,消息队列具有结构化、非阻塞和优先级控制等特点。
MQ 采用链表来实现消息队列,该链表是由系统内核维护,系统中可能有很多的 MQ,每个 MQ 用消息队列描述符(消息队列 ID)来区分。在进行任务间通信时,一个任务将消息加到 MQ 尾端,另一个任务从消息队列中取消息(不一定以先进先出来取消息,也可以按照消息类型字段取消息),这样就实现了任务间的通信。如下 MQ 的模型
消息队列的典型特点如下:
- 异步通信
发送方和接收方不需同时存在,消息可以先入队,稍后由接收方读取。 - 消息有结构
每条消息可以包含一个类型/优先级标识,便于有选择地接收特定类型的消息。 - 支持阻塞与非阻塞模式
接收和发送操作可配置为阻塞或非阻塞,提高通信灵活性。 - 容量限制与排队机制
消息队列有最大消息数和每条消息最大长度限制,超出后发送可能阻塞或失败。 - 跨进程通信(IPC)
支持多个进程之间通信,可用于分布式或模块化系统中。
2、SylixOS 下的消息队列
一个 SylixOS 消息队列必须要调用 API_MsgQueueCreate
函数创建之后才能使用,如果创建成功,API_MsgQueueCreate
函数将返回一个消息队列的句柄。
任务如果需要接收消息,可以调用 API_MsgQueueReceive
函数。发送消息可以调用 API_MsgQueueSend
函数。
SylixOS 支持消息优先级,让一些更紧急的消息得到及时处理。优先级由 API_MsgQueueSendEx2
函数入参控制。
当一个消息队列使用完毕后(并确保以后也不再使用),应该调用 API_MsgQueueDelete
函数将其删除,SylixOS 会回收该消息队列占用的内核资源。
使用消息队列接口时,要注意一些场景。例如,调用 API_MsgQueueSend、API_MsgQueueReceive 要考虑,该函数是否会引起睡眠?(这是由函数入参决定的)。以上只是案例,实际使用时要小心。
这里只讲解部分接口,完整的接口以及实现请阅读 SylixOS 内核源码。
2.1 创建消息队列
LW_API
LW_OBJECT_HANDLE API_MsgQueueCreate (CPCHAR pcName,ULONG ulMaxMsgCounter,size_t stMaxMsgByteSize,ULONG ulOption,LW_OBJECT_ID *pulId)
{....../* 函数返回的 pulId 就是 event ID,用来索引指定事件 */__KERNEL_MODE_PROC(pevent = _Allocate_Event_Object(); /* 申请事件 */);if (!pevent) {__KERNEL_MODE_PROC(_Free_MsgQueue_Object(pmsgqueue););_DebugHandle(__ERRORMESSAGE_LEVEL, "there is no ID to build a msgqueue.\r\n");_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);return (LW_OBJECT_HANDLE_INVALID);}/* 初始化消息缓冲区,消息缓冲区大小是不固定的,根据函数入参来决定 */stMaxMsgByteSizeReal = ROUND_UP(stMaxMsgByteSize, sizeof(size_t))+ sizeof(LW_CLASS_MSGNODE); /* 每条消息缓存大小 */stHeapAllocateByteSize = (size_t)ulMaxMsgCounter* stMaxMsgByteSizeReal; /* 需要分配的内存总大小 */pvMemAllocate = __KHEAP_ALLOC(stHeapAllocateByteSize); /* 申请内存 */if (!pvMemAllocate) {__KERNEL_MODE_PROC(_Free_MsgQueue_Object(pmsgqueue);_Free_Event_Object(pevent););_DebugHandle(__ERRORMESSAGE_LEVEL, "kernel low memory.\r\n");_ErrorHandle(ERROR_KERNEL_LOW_MEMORY);return (LW_OBJECT_HANDLE_INVALID);}if (pcName) { /* 拷贝名字 */lib_strcpy(pevent->EVENT_cEventName, pcName);} else {pevent->EVENT_cEventName[0] = PX_EOS; /* 清空名字 */}pmsgqueue->MSGQUEUE_pvBuffer = pvMemAllocate;pmsgqueue->MSGQUEUE_stMaxBytes = stMaxMsgByteSize;/* 这里会将申请到的消息缓冲区,根号每个消息大小,组成一个消息链表 */_MsgQueueClear(pmsgqueue, ulMaxMsgCounter); /* 缓存区准备好 */......
}
2.2 发送消息队列
LW_API
ULONG API_MsgQueueSend2 (LW_OBJECT_HANDLE ulId,const PVOID pvMsgBuffer,size_t stMsgLen,ULONG ulTimeout)
{....../* 通过事件 ID 找到对应事件,进而找到对应的消息队列 */pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;if (stMsgLen > pmsgqueue->MSGQUEUE_stMaxBytes) { /* 长度太长 */__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */_DebugHandle(__ERRORMESSAGE_LEVEL, "ulMsgLen invalidate.\r\n");_ErrorHandle(ERROR_MSGQUEUE_MSG_LEN);return (ERROR_MSGQUEUE_MSG_LEN);}if (_EventWaitNum(EVENT_MSG_Q_R, pevent)) { /* 有任务在等待消息 */BOOL bSendOk = LW_TRUE;/* 尝试将阻塞在接收该消息的任务由阻塞态,修改为就绪态 */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) { /* 优先级等待队列 */_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_R, ppringList); /* 激活优先级等待线程 */ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);} else {_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_R, ppringList); /* 激活FIFO等待线程 */ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);}if ((stMsgLen > ptcb->TCB_stMaxByteSize) && !(ptcb->TCB_ulRecvOption & LW_OPTION_NOERROR)) { /* 是否允许自动截断 */*ptcb->TCB_pstMsgByteSize = 0;ptcb->TCB_stMaxByteSize = 0;bSendOk = LW_FALSE;} else {stRealLen = (stMsgLen < ptcb->TCB_stMaxByteSize) ?(stMsgLen) : (ptcb->TCB_stMaxByteSize); /* 确定信息拷贝长短 */*ptcb->TCB_pstMsgByteSize = stRealLen; /* 保存长短 *//* * 注意,这里会将 msg 信息,直接拷贝到线程的 TCB 中 msg 指向的 buffer 中* TCB 中 msg 指针,是在任务尝试接收消息被阻塞时设置的。msg 指针直接指向阻塞任务的接收消息缓冲区* 这样可以省去一次拷贝操作,不需要在把消息拷贝到消息队列中*/lib_memcpy(ptcb->TCB_pvMsgQueueMessage, /* 传递消息 */pvMsgBuffer, stRealLen);}KN_INT_ENABLE(iregInterLevel); /* 使能中断 *//* 尝试将阻塞在接收该消息的任务,分配 cpu 运行 */_EventReadyHighLevel(ptcb,LW_THREAD_STATUS_MSGQUEUE,LW_SCHED_ACT_INTERRUPT); /* 处理 TCB */MONITOR_EVT_LONG2(MONITOR_EVENT_ID_MSGQ, MONITOR_EVENT_MSGQ_POST, ulId, ptcb->TCB_ulId, LW_NULL);/* 这里会引起调度,尝试调度阻塞在接收该消息的任务 */__KERNEL_EXIT(); /* 退出内核 */if (bSendOk == LW_FALSE) {goto __re_send; /* 重新发送 */}return (ERROR_NONE);} else {if (pevent->EVENT_ulCounter < pevent->EVENT_ulMaxCounter) { /* 检查是否还有空间加 */pevent->EVENT_ulCounter++;/* 这里会将消息放到消息队列缓冲区 */_MsgQueuePut(pmsgqueue, pvMsgBuffer, stMsgLen, EVENT_MSG_Q_PRIO_LOW); /* 保存消息 */__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */return (ERROR_NONE);} else {/* 下面就是处理,如果消息队列已经满了的情况,选择是否需要阻塞,等待消息队列有空闲 */if ((ulTimeout == LW_OPTION_NOT_WAIT) || LW_CPU_GET_CUR_NESTING()) { /* 不需要等待 */__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */_ErrorHandle(ERROR_MSGQUEUE_FULL);return (ERROR_MSGQUEUE_FULL);}LW_TCB_GET_CUR(ptcbCur); /* 当前任务控制块 */ptcbCur->TCB_iPendQ = EVENT_MSG_Q_S;ptcbCur->TCB_usStatus |= LW_THREAD_STATUS_MSGQUEUE; /* 写状态位,开始等待 */ptcbCur->TCB_ucWaitTimeout = LW_WAIT_TIME_CLEAR; /* 清空等待时间 */if (ulTimeout == LW_OPTION_WAIT_INFINITE) { /* 是否是无穷等待 */ptcbCur->TCB_ulDelay = 0ul;} else {ptcbCur->TCB_ulDelay = ulTimeout; /* 设置超时时间 */}__KERNEL_TIME_GET_IGNIRQ(ulTimeSave, ULONG); /* 记录系统时间 */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) {_EVENT_INDEX_Q_PRIORITY(ptcbCur->TCB_ucPriority, ucPriorityIndex);_EVENT_PRIORITY_Q_PTR(EVENT_MSG_Q_S, ppringList, ucPriorityIndex);ptcbCur->TCB_ppringPriorityQueue = ppringList; /* 记录等待队列位置 */_EventWaitPriority(pevent, ppringList); /* 加入优先级等待表 */} else { /* 按 FIFO 等待 */_EVENT_FIFO_Q_PTR(EVENT_MSG_Q_S, ppringList); /* 确定 FIFO 队列的位置 */_EventWaitFifo(pevent, ppringList); /* 加入 FIFO 等待表 */}KN_INT_ENABLE(iregInterLevel); /* 使能中断 */ulEventOption = pevent->EVENT_ulOption;iSchedRet = __KERNEL_EXIT(); /* 调度器解锁 */...... }}
}
SylixOS 消息队列还支持紧急消息的发送,支持消息的优先级,保证了某些异常情况下的安全。函数接口为:API_MsgQueueSendEx2
,通过入参 ulOption
来控制消息的优先级。
/*********************************************************************************************************消息队列发送选项 (URGENT 与 BROADCAST 不能同时设置)
*********************************************************************************************************/#define LW_OPTION_URGENT 0x00000001 /* 消息队列紧急消息发送 */
#define LW_OPTION_URGENT_0 LW_OPTION_URGENT/* 最高紧急优先级 */
#define LW_OPTION_URGENT_1 0x00000011
#define LW_OPTION_URGENT_2 0x00000021
#define LW_OPTION_URGENT_3 0x00000031
#define LW_OPTION_URGENT_4 0x00000041
#define LW_OPTION_URGENT_5 0x00000051
#define LW_OPTION_URGENT_6 0x00000061
#define LW_OPTION_URGENT_7 0x00000071 /* 最低紧急优先级 */#define LW_OPTION_BROADCAST 0x00000002 /* 消息队列广播发送 */LW_API ULONG API_MsgQueueSendEx2(LW_OBJECT_HANDLE ulId,const PVOID pvMsgBuffer,size_t stMsgLen,ULONG ulTimeout,ULONG ulOption);/* 带有超时的发送消息高级接口 */
关于这里的消息优先级,我们讲解一下 _MsgQueuePut
函数。
/*********************************************************************************************************
** 函数名称: _MsgQueuePut
** 功能描述: 向消息队列中写入一个消息
** 输 入 : pmsgqueue 消息队列控制块
** : pvMsgBuffer 消息缓冲区
** : stMsgLen 消息长度
** : uiPrio 消息优先级
** 输 出 : NONE
** 全局变量:
** 调用模块:
*********************************************************************************************************/
VOID _MsgQueuePut (PLW_CLASS_MSGQUEUE pmsgqueue,PVOID pvMsgBuffer,size_t stMsgLen, UINT uiPrio)
{PLW_CLASS_MSGNODE pmsgnode;/* 从消息队列中的空闲消息链表上,找到一个空闲消息块 */pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate(&pmsgqueue->MSGQUEUE_pmonoFree);pmsgnode->MSGNODE_stMsgLen = stMsgLen;lib_memcpy((PVOID)(pmsgnode + 1), pvMsgBuffer, stMsgLen); /* 拷贝消息 *//* 这里会设置优先级位图,就是为了确保,从消息队列中取数据时,取的一定是优先级最高的消息 */if (pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio] == LW_NULL) {pmsgqueue->MSGQUEUE_uiMap |= (1 << uiPrio); /* 设置优先级位图 */}/* 将消息块链接在指定的优先级链表上(尾插) */_list_mono_free_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[uiPrio], &pmsgqueue->MSGQUEUE_pmonoTail[uiPrio], &pmsgnode->MSGNODE_monoManage);
}
2.3 接收消息队列
/*********************************************************************************************************
** 函数名称: _MsgQueueGet
** 功能描述: 从消息队列中获得一个消息
** 输 入 : pmsgqueue 消息队列控制块
** : pvMsgBuffer 接收缓冲区
** : stMaxByteSize 缓冲区大小
** : pstMsgLen 获得消息的长度
** 输 出 : NONE
** 全局变量:
** 调用模块:
*********************************************************************************************************/
VOID _MsgQueueGet (PLW_CLASS_MSGQUEUE pmsgqueue,PVOID pvMsgBuffer,size_t stMaxByteSize,size_t *pstMsgLen)
{INT iQ;PLW_CLASS_MSGNODE pmsgnode;/* 在当前的消息队列中,找到优先级最高的消息块链表 */iQ = archFindLsb(pmsgqueue->MSGQUEUE_uiMap) - 1; /* 计算优先级 */_BugHandle(!pmsgqueue->MSGQUEUE_pmonoHeader[iQ], LW_TRUE, "buffer is empty!\r\n");/* 从消息链表上取下一个消息块,取的顺序是从 header 开始取,即相同优先级,按照先入先出顺序 */pmsgnode = (PLW_CLASS_MSGNODE)_list_mono_allocate_seq(&pmsgqueue->MSGQUEUE_pmonoHeader[iQ],&pmsgqueue->MSGQUEUE_pmonoTail[iQ]);if (pmsgqueue->MSGQUEUE_pmonoHeader[iQ] == LW_NULL) {pmsgqueue->MSGQUEUE_uiMap &= ~(1 << iQ); /* 清除优先级位图 */}*pstMsgLen = (stMaxByteSize < pmsgnode->MSGNODE_stMsgLen) ? (stMaxByteSize) : (pmsgnode->MSGNODE_stMsgLen); /* 确定拷贝信息数量 */lib_memcpy(pvMsgBuffer, (PVOID)(pmsgnode + 1), *pstMsgLen); /* 拷贝消息 *//* 将该消息块从消息链表中删除,并回收到消息队列的空闲链表上 */_list_mono_free(&pmsgqueue->MSGQUEUE_pmonoFree, &pmsgnode->MSGNODE_monoManage);
}
API_MsgQueueReceive
函数中的重点,还是 _MsgQueueGet
函数。
LW_API
ULONG API_MsgQueueReceive (LW_OBJECT_HANDLE ulId,PVOID pvMsgBuffer,size_t stMaxByteSize,size_t *pstMsgLen,ULONG ulTimeout){......pevent = &_K_eventBuffer[usIndex];iregInterLevel = __KERNEL_ENTER_IRQ(); /* 进入内核 */if (_Event_Type_Invalid(usIndex, LW_TYPE_EVENT_MSGQUEUE)) {__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */_ErrorHandle(ERROR_MSGQUEUE_TYPE);return (ERROR_MSGQUEUE_TYPE);}/* 通过指定事件 ID,获得消息队列结构 */pmsgqueue = (PLW_CLASS_MSGQUEUE)pevent->EVENT_pvPtr;ptcbCur->TCB_ulRecvOption = LW_OPTION_NOERROR; /* 接收大消息自动截断 *//* 如果消息队列中,消息不为空 */if (pevent->EVENT_ulCounter) { /* 事件有效 */pevent->EVENT_ulCounter--;_MsgQueueGet(pmsgqueue, pvMsgBuffer, stMaxByteSize, pstMsgLen); /* 获得消息 */if (_EventWaitNum(EVENT_MSG_Q_S, pevent)) { /* 有任务在等待写消息 */if (pevent->EVENT_ulOption & LW_OPTION_WAIT_PRIORITY) { /* 优先级等待队列 */_EVENT_DEL_Q_PRIORITY(EVENT_MSG_Q_S, ppringList); /* 激活优先级等待线程 */ptcb = _EventReadyPriorityLowLevel(pevent, LW_NULL, ppringList);} else {_EVENT_DEL_Q_FIFO(EVENT_MSG_Q_S, ppringList); /* 激活FIFO等待线程 */ptcb = _EventReadyFifoLowLevel(pevent, LW_NULL, ppringList);}KN_INT_ENABLE(iregInterLevel); /* 使能中断 */_EventReadyHighLevel(ptcb,LW_THREAD_STATUS_MSGQUEUE,LW_SCHED_ACT_INTERRUPT); /* 处理 TCB */__KERNEL_EXIT(); /* 退出内核 */} else {__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */}return (ERROR_NONE);}/* 下面就是判断,如果消息队列为空,是否需要阻塞等待 */if (ulTimeout == LW_OPTION_NOT_WAIT) { /* 不等待 */__KERNEL_EXIT_IRQ(iregInterLevel); /* 退出内核 */_ErrorHandle(ERROR_THREAD_WAIT_TIMEOUT); /* 超时 */return (ERROR_THREAD_WAIT_TIMEOUT);}ptcbCur->TCB_pstMsgByteSize = pstMsgLen;ptcbCur->TCB_stMaxByteSize = stMaxByteSize;/* 如果在 receive 消息时被阻塞,则将 buffer 赋值给 TCB 中的 msg 指针。再被唤醒时,数据已经被拷贝到 buffer 中了 */ptcbCur->TCB_pvMsgQueueMessage = pvMsgBuffer; /* 记录信息 */......
}