进程间通信:消息队列
目录
一、引言
二、消息队列的基本概念
2.1 什么是消息队列
2.2 消息队列的核心特性
三、消息队列的核心接口
3.1 创建或获取消息队列——msgget
3.2 发送消息——msgsnd
3.3 接收消息——msgrcv
3.4 控制管理(删除)——msgctl
四、消息队列实现进程间通信
五、结语
一、引言
进程间通信(Inter - Process Communication,IPC)是指在多进程环境下,不同进程之间进行信息交换、协同工作的机制和方法。通过上图可以看到,进程间通信的方式有很多种,管道、消息队列、共享内存……各有各的适用场景。本文聚焦的是如何通过消息队列来实现进程之间的通信。
二、消息队列的基本概念
2.1 什么是消息队列
消息队列(Message Queue)是一种先进先出(FIFO)的数据结构,用于解耦消息生产者与消费者。在进程间通信(IPC)中,消息队列作为内核持久化的通信通道,允许任意数量的进程在非关联状态下交换数据。
说人话就是,消息队列中的消息是按照消息进入队列的先后顺序来消费的,先进来的消息要被先消费。在消费者尚未运行或者还未读取消息队列中的消息的情况下,消息的生产者依然可以往消息队列中写入消息(消息队列未满),消息的生产者和消费者互不影响。所谓的内核持久化的通信通道就是消息队列的生命周期随内核,在生产者和消费者都退出后,消息队列依然存在,除非显示的删除消息队列或关闭系统。
2.2 消息队列的核心特性
特性 | 描述 |
异步通信 | 发送者和接收者不需要同时运行 |
队列存储 | 消息按发送顺序存储(FIFO) |
类型识别 | 消息可分类,支持选择性接收 |
内核持久化 | 消息在进程终止后依然存在 |
大容量通信 | 支持大型数据传输(系统参数限制) |
这里解释一下类型识别。存储在消息队列中的消息是带有类型的,它由一个结构体组织起来,这个结构体msgbuf如下:
struct msgbuf {
long mtype; // 消息类型
char mtext[1]; // 消息内容(可变长度)
};
插入到消息队列中的就是一个个msgbuf结构体,然后通过链表将这些结构体管理起来。
三、消息队列的核心接口
这里谈的消息队列,是System V版本的消息队列,它的核心接口主要有以下四个:
3.1 创建或获取消息队列——msgget
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key, int msgflag);
msgget函数,它用于创建一个新的消息队列,或者获取一个已经存在的消息队列的标识符。
key_t key:这是一个键值,用于标识消息队列。key可以通过ftok()函数生成,我们也可以自己指定一个整数值做为key。但是比较推荐前者,原因在于ftok函数,它根据文件路径和项目标识符生成一个在系统范围内唯一的键值,避免冲突。下面就顺带介绍一下ftok函数。
#include <sys/ipc.h>
key_t ftok(const char *pathname, int proj_id);
pathname:一个指向文件路径的指针,该文件必须存在且可访问。
proj_id:一个项目标识符,通常是一个字符的 ASCII 值(范围为 0 到 255 )
msgflag:用于指定操作的权限和行为。
- 常用的标志位有IPC_CREAT和IPC_EXCL。
- IPC_CREAT:如果指定的键值不存在对应的消息队列,则创建一个新的消息队列。
- IPC_EXCL:与 IPC_CREAT 一起使用。如果消息队列已存在,则返回错误。
- 权限位(如 0666 ):指定消息队列的访问权限(读写权限),类似于文件权限。
返回值
- 成功:返回消息队列的标识符(非负整数)。
- 失败:返回-1,并设置 errno 以指示错误原因。
3.2 发送消息——msgsnd
#include <sys/msg.h>
int msgsnd(int msqid, const void msgp[.msgsz], size_t msgsz, int msgflg);
msgsnd 是一个用于向消息队列发送消息的系统调用函数。它将消息发送到指定的消息队列中。
int msqid :消息队列的标识符,由 msgget 函数返回。
const void *msgp :指向结构体的指针,比如指向msgbuf。
size_t msgsz :消息正文的大小(以字节为单位)。这个值必须大于 0,并且不能超过系统定义的最大消息大小( msgmax )。不包含结构体中类型成员的大小,仅仅是正文的大小。
int msgflg:控制消息发送行为的标志。常用的标志包括:
- 0 :默认行为,如果消息队列已满,调用将阻塞,直到消息被发送或发生错误。
- IPC_NOWAIT :非阻塞模式,如果消息队列已满,调用将立即返回,并设置 errno 为 EAGAIN 。
返回值
- 成功:返回 0 。
- 失败:返回 -1 ,并设置 errno 以指示错误原因。
3.3 接收消息——msgrcv
#include <sys/msg.h>
ssize_t msgrcv(int msqid, void msgp[.msgsz], size_t msgsz, long msgtyp, int msgflg);
msgrcv 是一个用于从消息队列中接收消息的系统调用函数。它可以从指定的消息队列中读取消息,并将其存储到用户提供的缓冲区中。
int msqid :消息队列的标识符,由 msgget 函数返回。
void *msgp :指向消息缓冲区的指针。通常是一个指向 struct msgbuf 或自定义结构体的指针。
size_t msgsz :消息缓冲区的大小(以字节为单位)。这个值必须大于 0,并且不能超过系统定义的最大消息大小( msgmax )。
long msgtyp :指定要接收的消息类型。
int msgflg :控制消息接收行为的标志。常用的标志包括:
- 0 :默认行为,如果消息队列为空,调用将阻塞,直到消息到达或发生错误。
- IPC_NOWAIT :非阻塞模式,如果消息队列为空,调用将立即返回,并设置 errno 为 EAGAIN 。
返回值
- 成功:返回实际读取的消息正文的大小(以字节为单位)。
- 失败:返回 -1 ,并设置 errno 以指示错误原因。
3.4 控制管理(删除)——msgctl
#include <sys/ipc.h>
#include <sys/msg.h>int msgctl(int msqid, int cmd, struct msqid_ds *buf);
msgctl 是一个用于控制消息队列的系统调用函数。它提供了多种操作,包括获取消息队列的状态、修改消息队列的权限和删除消息队列。
int msqid :消息队列的标识符,由 msgget 函数返回。
int cmd :指定要执行的命令。常用的命令包括:
struct msqid_ds *buf :指向 struct msqid_ds 结构体的指针,用于存储或获取消息队列的状态信息。如果 cmd 是 IPC_RMID ,则 buf 可以是 NULL 。
以上就是关于消息队列核心接口的全部讲解,可能会迷惑,把这些接口都用过一遍就好了。
- IPC_STAT :获取消息队列的状态信息,并将其存储到 buf 指向的 struct msqid_ds 结构体中。 IPC_SET :设置消息队列的权限和所有者信息,这些信息从 buf 指向的 struct msqid_ds 结构体中获取。
- IPC_RMID :删除消息队列。
四、消息队列实现进程间通信
简单演示一下进程间是如何通过消息队列来通信的。进程server,创建消息队列,同时负责生产消息并插入到消息队列中,进程client负责消费消息队列中的信息,并在退出时将消息队列删除。
//common.h
#define MSGSIZE 128 //消息大小
#define MSGKEY 1234 //消息队列的key
#define MSGTYPE 1 //消息类型//消息队列结构体
struct my_msgbuf {long mtype;char mtext[MSGSIZE];
};//server.cpp
#include <iostream>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <cstring>
#include <string>
#include "common.h"/** 主函数:创建消息队列并进行消息发送* 使用System V消息队列进行进程间通信*/
int main()
{// 创建消息队列,权限为0666,如果已存在则报错int msgid = msgget(MSGKEY, IPC_CREAT | IPC_EXCL | 0666);// 检查消息队列创建是否成功if (msgid == -1) {std::cerr << "Error: msgget failed" << std::endl;return 2;}// 输出消息队列创建成功的消息及其IDstd::cout << "Step1:created message queue with id:" << msgid << std::endl;// 定义消息缓冲区my_msgbuf sndmsg;// 设置消息类型sndmsg.mtype = MSGTYPE;// 进入消息发送循环while (true) {// 清空消息文本内容memset(sndmsg.mtext, 0, sizeof(sndmsg.mtext));// 提示用户输入消息std::cout << "Step2:waiting for message..." << std::endl << "Server:";// 获取用户输入std::string input;std::getline(std::cin, input);// 将用户输入复制到消息结构中strcpy(sndmsg.mtext, input.c_str());// 发送消息到队列if (msgsnd(msgid, &sndmsg,input.size() + 1, 0) == -1) {std::cerr << "Error: msgsnd failed" << std::endl;return 3;}// 检查是否输入了"exit"命令,如果是则退出循环if (strcmp(sndmsg.mtext, "exit") == 0) break;}return 0;
}//client.cpp
#include <iostream>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#include <cstring>
#include "common.h"
/*** 主函数:创建消息队列并接收消息* 当收到"exit"消息时删除消息队列并退出程序*/
int main()
{// 创建或获取消息队列,权限为0666int msgid = msgget(MSGKEY, 0666 | IPC_CREAT);// 检查消息队列创建是否成功if (msgid == -1) {std::cerr << "msgget error" << std::endl;return 1;}// 定义消息缓冲区变量my_msgbuf revmsg;// 无限循环接收消息while (true) {// 从消息队列中接收消息,类型为MSGTYPEif (msgrcv(msgid, &revmsg, sizeof(revmsg.mtext), MSGTYPE, 0) == -1) {std::cerr << "msgrcv error" << std::endl;return 2;}// 检查消息是否为"exit",如果是则删除消息队列并退出循环if (strcmp(revmsg.mtext,"exit") == 0) {msgctl(msgid, IPC_RMID, NULL);std::cout << "msg queue deleted" << std::endl;break;}// 输出接收到的消息内容std::cout << "receive message: " << revmsg.mtext << std::endl;}return 0;
}
五、结语
消息队列有很多种版本,这只是其中一种,本质上差不了多少。若有进一步疑问,随时可深入交流。希望以上内容能为相关需求提供有效参考价值。
完~