mq_open系统调用及示例
POSIX 消息队列 (mq_*) 函数详解
1. 函数介绍
POSIX 消息队列是一组用于进程间通信(IPC)的函数,提供了一种可靠的、基于消息的通信机制。可以把消息队列想象成"邮局系统"——发送者将消息放入邮箱(队列),接收者从邮箱中取出消息,就像现实中的邮政服务一样。
与传统的 System V 消息队列相比,POSIX 消息队列具有更好的可移植性和更简洁的 API。它们支持优先级消息、持久化、以及通过文件系统路径名进行命名。
2. 核心函数原型
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>// 核心函数
mqd_t mq_open(const char *name, int oflag, ...);
int mq_close(mqd_t mqdes);
int mq_unlink(const char *name);
int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
int mq_notify(mqd_t mqdes, const struct sigevent *notification);
3. 功能
POSIX 消息队列提供以下功能:
- 创建和打开消息队列
- 发送和接收消息
- 设置和获取队列属性
- 异步通知机制
- 持久化支持
- 优先级消息支持
4. 核心结构体
struct mq_attr
struct mq_attr {long mq_flags; /* 消息队列标志 */long mq_maxmsg; /* 最大消息数 */long mq_msgsize; /* 最大消息大小 */long mq_curmsgs; /* 当前消息数 */
};
struct sigevent (用于通知)
struct sigevent {int sigev_notify; /* 通知类型 */int sigev_signo; /* 信号编号 */union sigval sigev_value; /* 传递给处理函数的数据 */void (*sigev_notify_function)(union sigval); /* 线程函数 */pthread_attr_t *sigev_notify_attributes; /* 线程属性 */
};
5. 消息队列名称
- 名称必须以 ‘/’ 开头
- 长度限制为 NAME_MAX (通常 255 字符)
- 示例:“/my_queue”, “/app/messages”
6. 打开标志 (oflag)
标志 | 说明 |
---|---|
O_RDONLY | 只读打开 |
O_WRONLY | 只写打开 |
O_RDWR | 读写打开 |
O_CREAT | 不存在时创建 |
O_EXCL | 与 O_CREAT 一起使用,如果存在则失败 |
O_NONBLOCK | 非阻塞模式 |
7. 返回值
- mq_open: 成功返回消息队列描述符,失败返回 (mqd_t)-1
- 其他函数: 成功返回 0,失败返回 -1
8. 相关函数
- pthread: 多线程支持
- signal: 信号处理
- fcntl: 文件控制
- unlink: 删除文件
9. 示例代码
示例1:基础用法 - 简单的消息发送和接收
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>#define QUEUE_NAME "/example_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 10int main() {mqd_t mq;struct mq_attr attr;char send_buffer[MAX_MSG_SIZE];char recv_buffer[MAX_MSG_SIZE];ssize_t bytes_read;unsigned int priority;printf("=== POSIX 消息队列基础示例 ===\n\n");// 设置消息队列属性attr.mq_flags = 0;attr.mq_maxmsg = MAX_MSGS;attr.mq_msgsize = MAX_MSG_SIZE;attr.mq_curmsgs = 0;// 创建并打开消息队列printf("创建消息队列: %s\n", QUEUE_NAME);mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR, 0644, &attr);if (mq == (mqd_t)-1) {perror("mq_open");exit(1);}printf("✓ 消息队列创建成功\n\n");// 获取并显示队列属性printf("消息队列属性:\n");if (mq_getattr(mq, &attr) == 0) {printf(" 最大消息数: %ld\n", attr.mq_maxmsg);printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);printf(" 当前消息数: %ld\n", attr.mq_curmsgs);printf(" 标志: %ld\n", attr.mq_flags);}printf("\n");// 发送消息printf("发送消息:\n");const char* messages[] = {"第一条消息: Hello, World!","第二条消息: 欢迎使用 POSIX 消息队列","第三条消息: 这是优先级消息","第四条消息: 最后一条测试消息"};int priorities[] = {0, 0, 10, 0}; // 优先级 (数值越大优先级越高)for (int i = 0; i < 4; i++) {if (mq_send(mq, messages[i], strlen(messages[i]) + 1, priorities[i]) == 0) {printf(" ✓ 发送消息 %d (优先级 %d): %s\n", i + 1, priorities[i], messages[i]);} else {perror(" ✗ mq_send 失败");}}// 显示发送后队列状态if (mq_getattr(mq, &attr) == 0) {printf("\n发送后队列状态: %ld 条消息\n", attr.mq_curmsgs);}// 接收消息printf("\n接收消息 (按优先级顺序):\n");for (int i = 0; i < 4; i++) {bytes_read = mq_receive(mq, recv_buffer, MAX_MSG_SIZE, &priority);if (bytes_read != -1) {printf(" ✓ 接收消息 %d (优先级 %d, 长度 %zd): %s\n", i + 1, priority, bytes_read, recv_buffer);} else {if (errno == EAGAIN) {printf(" ⚠ 队列为空\n");break;} else {perror(" ✗ mq_receive 失败");break;}}}// 显示接收后队列状态if (mq_getattr(mq, &attr) == 0) {printf("\n接收后队列状态: %ld 条消息\n", attr.mq_curmsgs);}// 关闭消息队列if (mq_close(mq) == 0) {printf("✓ 消息队列关闭成功\n");} else {perror("✗ mq_close 失败");}// 删除消息队列if (mq_unlink(QUEUE_NAME) == 0) {printf("✓ 消息队列删除成功\n");} else {perror("✗ mq_unlink 失败");}printf("\n=== 消息队列特点 ===\n");printf("1. 支持优先级消息 (数值越大优先级越高)\n");printf("2. 消息大小可配置\n");printf("3. 消息数量有限制\n");printf("4. 支持持久化 (直到显式删除)\n");printf("5. 可通过文件系统路径访问\n");return 0;
}
示例2:生产者-消费者模型
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <time.h>#define QUEUE_NAME "/producer_consumer_queue"
#define MAX_MSG_SIZE 256
#define MAX_MSGS 20
#define NUM_MESSAGES 10// 全局变量
mqd_t mq;
int producer_count = 0;
int consumer_count = 0;
pthread_mutex_t count_mutex = PTHREAD_MUTEX_INITIALIZER;// 生产者线程函数
void* producer_thread(void* arg) {int producer_id = *(int*)arg;char message[MAX_MSG_SIZE];time_t now;printf("生产者 %d 启动\n", producer_id);for (int i = 0; i < NUM_MESSAGES; i++) {// 构造消息time(&now);snprintf(message, sizeof(message), "P%d-MSG%d-TIME:%s", producer_id, i + 1, ctime(&now));// 发送消息 (交替使用不同优先级)unsigned int priority = (i % 3 == 0) ? 5 : 1; // 每第3条高优先级if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {pthread_mutex_lock(&count_mutex);producer_count++;pthread_mutex_unlock(&count_mutex);printf("生产者 %d 发送消息: %s (优先级 %u)\n", producer_id, message, priority);} else {perror("生产者发送失败");}// 随机延迟usleep((rand() % 100 + 1) * 1000); // 1-100ms}printf("生产者 %d 完成\n", producer_id);return NULL;
}// 消费者线程函数
void* consumer_thread(void* arg) {int consumer_id = *(int*)arg;char message[MAX_MSG_SIZE];ssize_t bytes_read;unsigned int priority;printf("消费者 %d 启动\n", consumer_id);while (1) {// 接收消息bytes_read = mq_receive(mq, message, MAX_MSG_SIZE, &priority);if (bytes_read != -1) {pthread_mutex_lock(&count_mutex);consumer_count++;int current_count = consumer_count;pthread_mutex_unlock(&count_mutex);printf("消费者 %d 接收消息 %d (优先级 %u): %s", consumer_id, current_count, priority, message);// 检查是否接收完所有消息if (current_count >= NUM_MESSAGES * 2) { // 2个生产者break;}} else {if (errno == EAGAIN) {// 非阻塞模式下队列为空usleep(10000); // 10mscontinue;} else {perror("消费者接收失败");break;}}// 随机延迟usleep((rand() % 50 + 1) * 1000); // 1-50ms}printf("消费者 %d 完成\n", consumer_id);return NULL;
}int main() {pthread_t producers[2];pthread_t consumers[3];int producer_ids[2] = {1, 2};int consumer_ids[3] = {1, 2, 3};struct mq_attr attr;printf("=== 生产者-消费者消息队列示例 ===\n\n");// 初始化随机数种子srand(time(NULL) + getpid());// 设置消息队列属性attr.mq_flags = 0; // 阻塞模式attr.mq_maxmsg = MAX_MSGS;attr.mq_msgsize = MAX_MSG_SIZE;attr.mq_curmsgs = 0;// 创建消息队列printf("创建消息队列: %s\n", QUEUE_NAME);mq = mq_open(QUEUE_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0644, &attr);if (mq == (mqd_t)-1) {perror("mq_open");exit(1);}printf("✓ 消息队列创建成功\n\n");// 创建生产者线程printf("创建生产者线程...\n");for (int i = 0; i < 2; i++) {if (pthread_create(&producers[i], NULL, producer_thread, &producer_ids[i]) != 0) {perror("创建生产者线程失败");exit(1);}}// 创建消费者线程printf("创建消费者线程...\n");for (int i = 0; i < 3; i++) {if (pthread_create(&consumers[i], NULL, consumer_thread, &consumer_ids[i]) != 0) {perror("创建消费者线程失败");exit(1);}}// 等待生产者完成printf("等待生产者完成...\n");for (int i = 0; i < 2; i++) {pthread_join(producers[i], NULL);}// 等待消费者完成printf("等待消费者完成...\n");for (int i = 0; i < 3; i++) {pthread_join(consumers[i], NULL);}// 显示统计信息printf("\n=== 统计信息 ===\n");printf("生产消息数: %d\n", producer_count);printf("消费消息数: %d\n", consumer_count);// 显示最终队列状态if (mq_getattr(mq, &attr) == 0) {printf("队列中剩余消息: %ld\n", attr.mq_curmsgs);}// 清理资源if (mq_close(mq) == 0) {printf("✓ 消息队列关闭成功\n");}if (mq_unlink(QUEUE_NAME) == 0) {printf("✓ 消息队列删除成功\n");}printf("\n=== 生产者-消费者模型特点 ===\n");printf("1. 解耦: 生产者和消费者独立运行\n");printf("2. 异步: 生产和消费可以不同步进行\n");printf("3. 缓冲: 消息队列提供缓冲作用\n");printf("4. 负载均衡: 多个消费者可以并行处理\n");printf("5. 可靠性: 消息持久化存储\n");return 0;
}
示例3:完整的消息队列管理系统
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <time.h>// 配置结构体
struct mq_config {char *queue_name;int max_messages;int max_message_size;int create_queue;int delete_queue;int show_info;int send_message;int receive_message;int list_queues;int priority;char *message_content;int non_blocking;int verbose;
};// 全局变量
volatile sig_atomic_t running = 1;// 信号处理函数
void signal_handler(int sig) {printf("\n收到信号 %d,准备退出...\n", sig);running = 0;
}// 设置信号处理
void setup_signal_handlers() {struct sigaction sa;sa.sa_handler = signal_handler;sigemptyset(&sa.sa_mask);sa.sa_flags = 0;sigaction(SIGINT, &sa, NULL); // Ctrl+Csigaction(SIGTERM, &sa, NULL); // 终止信号
}// 显示消息队列信息
void show_queue_info(mqd_t mq) {struct mq_attr attr;if (mq_getattr(mq, &attr) == 0) {printf("消息队列属性:\n");printf(" 最大消息数: %ld\n", attr.mq_maxmsg);printf(" 最大消息大小: %ld 字节\n", attr.mq_msgsize);printf(" 当前消息数: %ld\n", attr.mq_curmsgs);printf(" 标志: %s\n", (attr.mq_flags & O_NONBLOCK) ? "非阻塞" : "阻塞");} else {perror("获取队列属性失败");}
}// 列出所有消息队列
void list_all_queues() {printf("=== 系统消息队列列表 ===\n");printf("注意: POSIX 消息队列通常在 /dev/mqueue/ 目录下\n");// 尝试列出 /dev/mqueue/ 目录if (access("/dev/mqueue", F_OK) == 0) {printf("系统消息队列目录存在\n");system("ls -la /dev/mqueue/ 2>/dev/null || echo '无法访问 /dev/mqueue/'");} else {printf("系统消息队列目录不存在\n");}printf("\n");
}// 发送消息
int send_message_to_queue(mqd_t mq, const char *message, int priority, int non_blocking) {struct mq_attr attr;// 检查消息大小if (mq_getattr(mq, &attr) == 0) {if (strlen(message) + 1 > (size_t)attr.mq_msgsize) {fprintf(stderr, "错误: 消息大小 (%zu) 超过队列限制 (%ld)\n", strlen(message) + 1, attr.mq_msgsize);return -1;}}// 发送消息if (mq_send(mq, message, strlen(message) + 1, priority) == 0) {printf("✓ 消息发送成功 (优先级 %d): %s\n", priority, message);return 0;} else {if (errno == EAGAIN && non_blocking) {printf("⚠ 队列已满,非阻塞模式下发送失败\n");} else {perror("✗ 消息发送失败");}return -1;}
}// 接收消息
int receive_message_from_queue(mqd_t mq, int non_blocking) {char *buffer;struct mq_attr attr;ssize_t bytes_read;unsigned int priority;// 获取队列属性以确定缓冲区大小if (mq_getattr(mq, &attr) != 0) {perror("获取队列属性失败");return -1;}buffer = malloc(attr.mq_msgsize);if (!buffer) {perror("内存分配失败");return -1;}// 接收消息bytes_read = mq_receive(mq, buffer, attr.mq_msgsize, &priority);if (bytes_read != -1) {printf("✓ 消息接收成功 (优先级 %u, 长度 %zd): %s", priority, bytes_read, buffer);free(buffer);return 0;} else {if (errno == EAGAIN && non_blocking) {printf("⚠ 队列为空,非阻塞模式下接收失败\n");} else {perror("✗ 消息接收失败");}free(buffer);return -1;}
}// 创建消息队列
mqd_t create_message_queue(const char *name, int max_msgs, int max_size, int non_blocking) {struct mq_attr attr;int flags = O_CREAT | O_RDWR;if (non_blocking) {flags |= O_NONBLOCK;}attr.mq_flags = non_blocking ? O_NONBLOCK : 0;attr.mq_maxmsg = max_msgs;attr.mq_msgsize = max_size;attr.mq_curmsgs = 0;mqd_t mq = mq_open(name, flags, 0644, &attr);if (mq == (mqd_t)-1) {perror("创建消息队列失败");return (mqd_t)-1;}printf("✓ 消息队列创建成功: %s\n", name);return mq;
}// 打开现有消息队列
mqd_t open_existing_queue(const char *name, int non_blocking) {int flags = O_RDWR;if (non_blocking) {flags |= O_NONBLOCK;}mqd_t mq = mq_open(name, flags);if (mq == (mqd_t)-1) {perror("打开消息队列失败");return (mqd_t)-1;}printf("✓ 消息队列打开成功: %s\n", name);return mq;
}// 显示帮助信息
void show_help(const char *program_name) {printf("用法: %s [选项]\n", program_name);printf("\n选项:\n");printf(" -n, --name=NAME 消息队列名称 (以 / 开头)\n");printf(" -c, --create 创建消息队列\n");printf(" -d, --delete 删除消息队列\n");printf(" -i, --info 显示队列信息\n");printf(" -l, --list 列出所有队列\n");printf(" -s, --send=MESSAGE 发送消息\n");printf(" -r, --receive 接收消息\n");printf(" -p, --priority=NUM 消息优先级 (默认 0)\n");printf(" -m, --max-msgs=NUM 最大消息数 (创建时使用)\n");printf(" -z, --max-size=NUM 最大消息大小 (创建时使用)\n");printf(" -b, --non-blocking 非阻塞模式\n");printf(" -v, --verbose 详细输出\n");printf(" -h, --help 显示此帮助信息\n");printf("\n示例:\n");printf(" %s -n /myqueue -c -m 10 -z 256 # 创建队列\n", program_name);printf(" %s -n /myqueue -s \"Hello World\" # 发送消息\n", program_name);printf(" %s -n /myqueue -r # 接收消息\n", program_name);printf(" %s -n /myqueue -i # 显示队列信息\n", program_name);printf(" %s -n /myqueue -d # 删除队列\n", program_name);printf(" %s -l # 列出所有队列\n", program_name);
}int main(int argc, char *argv[]) {struct mq_config config = {.queue_name = NULL,.max_messages = 10,.max_message_size = 256,.create_queue = 0,.delete_queue = 0,.show_info = 0,.send_message = 0,.receive_message = 0,.list_queues = 0,.priority = 0,.message_content = NULL,.non_blocking = 0,.verbose = 0};printf("=== POSIX 消息队列管理系统 ===\n\n");// 解析命令行参数static struct option long_options[] = {{"name", required_argument, 0, 'n'},{"create", no_argument, 0, 'c'},{"delete", no_argument, 0, 'd'},{"info", no_argument, 0, 'i'},{"list", no_argument, 0, 'l'},{"send", required_argument, 0, 's'},{"receive", no_argument, 0, 'r'},{"priority", required_argument, 0, 'p'},{"max-msgs", required_argument, 0, 'm'},{"max-size", required_argument, 0, 'z'},{"non-blocking", no_argument, 0, 'b'},{"verbose", no_argument, 0, 'v'},{"help", no_argument, 0, 'h'},{0, 0, 0, 0}};int opt;while ((opt = getopt_long(argc, argv, "n:cdils:rp:m:z:bvh", long_options, NULL)) != -1) {switch (opt) {case 'n':config.queue_name = optarg;break;case 'c':config.create_queue = 1;break;case 'd':config.delete_queue = 1;break;case 'i':config.show_info = 1;break;case 'l':config.list_queues = 1;break;case 's':config.send_message = 1;config.message_content = optarg;break;case 'r':config.receive_message = 1;break;case 'p':config.priority = atoi(optarg);break;case 'm':config.max_messages = atoi(optarg);break;case 'z':config.max_message_size = atoi(optarg);break;case 'b':config.non_blocking = 1;break;case 'v':config.verbose = 1;break;case 'h':show_help(argv[0]);return 0;default:fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);return 1;}}// 设置信号处理setup_signal_handlers();// 显示系统信息if (config.verbose) {printf("系统信息:\n");printf(" 当前用户 UID: %d\n", getuid());printf(" 当前进程 PID: %d\n", getpid());printf(" 消息队列支持: ");system("ls /dev/mqueue/ >/dev/null 2>&1 && echo '是' || echo '否'");printf("\n");}// 列出所有队列if (config.list_queues) {list_all_queues();if (!config.queue_name && !config.create_queue && !config.delete_queue &&!config.show_info && !config.send_message && !config.receive_message) {return 0;}}// 如果没有指定队列名称且需要操作队列if (!config.queue_name && (config.create_queue || config.delete_queue || config.show_info || config.send_message || config.receive_message)) {fprintf(stderr, "错误: 需要指定消息队列名称\n");fprintf(stderr, "使用 '%s --help' 查看帮助信息\n", argv[0]);return 1;}// 处理队列操作mqd_t mq = (mqd_t)-1;if (config.create_queue) {mq = create_message_queue(config.queue_name, config.max_messages, config.max_message_size, config.non_blocking);if (mq == (mqd_t)-1) {return 1;}if (config.show_info) {show_queue_info(mq);}} else if (config.queue_name) {// 打开现有队列mq = open_existing_queue(config.queue_name, config.non_blocking);if (mq == (mqd_t)-1) {return 1;}}// 显示队列信息if (config.show_info && mq != (mqd_t)-1) {show_queue_info(mq);}// 发送消息if (config.send_message && config.message_content && mq != (mqd_t)-1) {send_message_to_queue(mq, config.message_content, config.priority, config.non_blocking);}// 接收消息if (config.receive_message && mq != (mqd_t)-1) {if (config.non_blocking) {receive_message_from_queue(mq, config.non_blocking);} else {printf("等待接收消息 (按 Ctrl+C 退出)...\n");while (running) {if (receive_message_from_queue(mq, config.non_blocking) == -1) {if (errno != EAGAIN) {break;}}if (!config.non_blocking) {sleep(1); // 阻塞模式下定期检查}}}}// 删除队列if (config.delete_queue && config.queue_name) {if (mq_unlink(config.queue_name) == 0) {printf("✓ 消息队列删除成功: %s\n", config.queue_name);} else {perror("✗ 消息队列删除失败");}}// 关闭队列if (mq != (mqd_t)-1) {if (mq_close(mq) == 0) {if (config.verbose) {printf("✓ 消息队列关闭成功\n");}} else {perror("✗ 消息队列关闭失败");}}// 显示使用建议printf("\n=== POSIX 消息队列使用建议 ===\n");printf("适用场景:\n");printf("1. 进程间通信 (IPC)\n");printf("2. 生产者-消费者模式\n");printf("3. 异步消息处理\n");printf("4. 系统服务通信\n");printf("5. 微服务架构\n");printf("\n");printf("优势:\n");printf("1. 可靠性: 消息持久化存储\n");printf("2. 优先级: 支持消息优先级\n");printf("3. 可移植: POSIX 标准\n");printf("4. 灵活性: 支持阻塞和非阻塞模式\n");printf("5. 安全性: 通过文件系统权限控制\n");printf("\n");printf("注意事项:\n");printf("1. 需要链接实时库: -lrt\n");printf("2. 队列名称必须以 / 开头\n");printf("3. 消息大小和数量有限制\n");printf("4. 需要适当权限才能创建/删除队列\n");printf("5. 应该及时关闭和清理队列资源\n");return 0;
}
编译和运行说明
# 编译示例程序(需要链接实时库)
gcc -o mq_example1 example1.c -lrt
gcc -o mq_example2 example2.c -lrt -lpthread
gcc -o mq_example3 example3.c -lrt -lpthread# 运行示例
./mq_example1
./mq_example2
./mq_example3 --help# 基本操作示例
./mq_example3 -n /test_queue -c -m 5 -z 128
./mq_example3 -n /test_queue -s "Hello, Message Queue!"
./mq_example3 -n /test_queue -r
./mq_example3 -n /test_queue -i
./mq_example3 -n /test_queue -d# 列出所有队列
./mq_example3 -l
系统要求检查
# 检查系统支持
ls /dev/mqueue/ 2>/dev/null || echo "消息队列目录不存在"# 检查内核配置
grep -i mq /boot/config-$(uname -r)# 检查库支持
ldd --version# 查看系统限制
ulimit -a | grep -i msg
cat /proc/sys/fs/mqueue/
重要注意事项
1. 编译要求: 需要链接实时库 -lrt
2. 权限要求: 创建/删除队列通常需要适当权限
3. 名称规范: 队列名称必须以 ‘/’ 开头
4. 资源限制: 受系统消息队列限制约束
5. 清理责任: 应该及时关闭和删除队列
6. 线程安全: 消息队列描述符在多线程间共享是安全的
实际应用场景
1. 微服务通信: 服务间异步消息传递
2. 日志系统: 异步日志记录
3. 任务队列: 后台任务处理
4. 事件驱动: 事件通知和处理
5. 数据流: 实时数据处理管道
6. 系统监控: 状态变更通知
最佳实践
// 安全的消息队列操作函数
mqd_t safe_mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr) {mqd_t mq = mq_open(name, oflag, mode, attr);if (mq == (mqd_t)-1) {switch (errno) {case EACCES:fprintf(stderr, "权限不足访问队列: %s\n", name);break;case EEXIST:fprintf(stderr, "队列已存在: %s\n", name);break;case ENOENT:fprintf(stderr, "队列不存在: %s\n", name);break;case EINVAL:fprintf(stderr, "无效的队列名称或参数: %s\n", name);break;default:perror("mq_open 失败");break;}}return mq;
}// 可靠的消息发送函数
int reliable_mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, int timeout_seconds) {struct timespec timeout;int result;if (timeout_seconds > 0) {clock_gettime(CLOCK_REALTIME, &timeout);timeout.tv_sec += timeout_seconds;result = mq_timedsend(mqdes, msg_ptr, msg_len, msg_prio, &timeout);} else {result = mq_send(mqdes, msg_ptr, msg_len, msg_prio);}return result;
}// 带重试的消息接收函数
ssize_t retry_mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, int max_retries) {ssize_t result;int retries = 0;while (retries < max_retries) {result = mq_receive(mqdes, msg_ptr, msg_len, msg_prio);if (result != -1) {return result; // 成功接收}if (errno == EAGAIN) {retries++;usleep(100000); // 100ms 延迟后重试} else {break; // 其他错误,不再重试}}return result;
}
这些示例展示了 POSIX 消息队列的各种使用方法,从基础的消息发送接收到完整的管理系统,帮助你全面掌握 Linux 系统中的消息队列机制。