Linux 消息队列接收与处理线程实现
下面是一个完整的 C 语言实现,创建一个线程来接收消息队列中的数据,缓存到队列中,然后逐个处理。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include <unistd.h>// 消息结构体
typedef struct {long mtype; // 消息类型char mtext[1024]; // 消息内容
} Message;// 数据节点结构
typedef struct DataNode {Message message;struct DataNode *next;
} DataNode;// 线程共享数据结构
typedef struct {pthread_mutex_t lock; // 互斥锁pthread_cond_t cond; // 条件变量DataNode *head; // 队列头指针DataNode *tail; // 队列尾指针int msgq_id; // 消息队列IDint running; // 线程运行标志
} ThreadData;// 初始化线程共享数据
int init_thread_data(ThreadData *data, int msgq_id) {data->head = NULL;data->tail = NULL;data->msgq_id = msgq_id;data->running = 1;if (pthread_mutex_init(&data->lock, NULL) {perror("pthread_mutex_init failed");return -1;}if (pthread_cond_init(&data->cond, NULL)) {perror("pthread_cond_init failed");pthread_mutex_destroy(&data->lock);return -1;}return 0;
}// 清理线程共享数据
void cleanup_thread_data(ThreadData *data) {pthread_mutex_lock(&data->lock);// 清空队列DataNode *current = data->head;while (current != NULL) {DataNode *temp = current;current = current->next;free(temp);}data->head = NULL;data->tail = NULL;pthread_mutex_unlock(&data->lock);pthread_mutex_destroy(&data->lock);pthread_cond_destroy(&data->cond);
}// 添加消息到队列
void enqueue_message(ThreadData *data, const Message *msg) {DataNode *node = (DataNode *)malloc(sizeof(DataNode));if (!node) {perror("malloc failed");return;}node->message = *msg;node->next = NULL;pthread_mutex_lock(&data->lock);if (data->tail == NULL) {data->head = node;data->tail = node;} else {data->tail->next = node;data->tail = node;}// 通知有数据到达pthread_cond_signal(&data->cond);pthread_mutex_unlock(&data->lock);
}// 从队列获取消息
int dequeue_message(ThreadData *data, Message *msg) {pthread_mutex_lock(&data->lock);// 等待队列不为空while (data->head == NULL && data->running) {pthread_cond_wait(&data->cond, &data->lock);}if (!data->running) {pthread_mutex_unlock(&data->lock);return -1;}DataNode *node = data->head;*msg = node->message;data->head = node->next;if (data->head == NULL) {data->tail = NULL;}pthread_mutex_unlock(&data->lock);free(node);return 0;
}// 处理消息的函数
void process_message(const Message *msg) {printf("Processing message type %ld: %s\n", msg->mtype, msg->mtext);// 这里添加实际的消息处理逻辑
}// 消息处理线程函数
void *process_thread_func(void *arg) {ThreadData *data = (ThreadData *)arg;Message msg;while (1) {if (dequeue_message(data, &msg) {break; // 线程终止}process_message(&msg);}return NULL;
}// 消息接收线程函数
void *receive_thread_func(void *arg) {ThreadData *data = (ThreadData *)arg;Message msg;while (data->running) {// 接收消息 (阻塞方式)ssize_t len = msgrcv(data->msgq_id, &msg, sizeof(msg.mtext), 0, 0);if (len == -1) {if (errno == EINTR) {continue; // 被信号中断,继续}perror("msgrcv failed");break;}msg.mtext[len] = '\0'; // 确保字符串结束// 将消息加入队列enqueue_message(data, &msg);}// 通知处理线程退出pthread_mutex_lock(&data->lock);pthread_cond_signal(&data->cond);pthread_mutex_unlock(&data->lock);return NULL;
}int main() {// 创建或获取消息队列key_t key = ftok(".", 'a');if (key == -1) {perror("ftok failed");return 1;}int msgq_id = msgget(key, IPC_CREAT | 0666);if (msgq_id == -1) {perror("msgget failed");return 1;}printf("Message queue created with id: %d\n", msgq_id);// 初始化线程共享数据ThreadData thread_data;if (init_thread_data(&thread_data, msgq_id)) {return 1;}// 创建接收线程和处理线程pthread_t receive_thread, process_thread;if (pthread_create(&receive_thread, NULL, receive_thread_func, &thread_data)) {perror("pthread_create receive_thread failed");cleanup_thread_data(&thread_data);return 1;}if (pthread_create(&process_thread, NULL, process_thread_func, &thread_data)) {perror("pthread_create process_thread failed");thread_data.running = 0;pthread_join(receive_thread, NULL);cleanup_thread_data(&thread_data);return 1;}// 主线程等待用户输入退出printf("Press Enter to exit...\n");getchar();// 设置停止标志thread_data.running = 0;// 唤醒可能正在等待的线程pthread_mutex_lock(&thread_data.lock);pthread_cond_signal(&thread_data.cond);pthread_mutex_unlock(&thread_data.lock);// 等待线程结束pthread_join(receive_thread, NULL);pthread_join(process_thread, NULL);// 清理资源cleanup_thread_data(&thread_data);// 删除消息队列 (可选)if (msgctl(msgq_id, IPC_RMID, NULL) == -1) {perror("msgctl IPC_RMID failed");}return 0;
}
代码说明
1. 消息队列结构:
◦ 使用标准的 System V 消息队列
◦ 消息类型为 Message 结构体
2. 线程安全队列:
◦ 使用链表实现 FIFO 队列
◦ 使用互斥锁 (pthread_mutex_t) 保护共享数据
◦ 使用条件变量 (pthread_cond_t) 实现生产者-消费者模型
3. 线程设计:
◦ 接收线程: 从消息队列接收消息并放入处理队列
◦ 处理线程: 从处理队列取出消息并处理
4. 同步机制:
◦ 当队列为空时,处理线程等待条件变量
◦ 当有新消息到达时,接收线程通知条件变量
5. 优雅退出:
◦ 设置 running 标志位控制线程退出
◦ 确保资源正确释放
编译与运行
编译命令:
gcc -o msgq_processor msgq_processor.c -lpthread
运行:
./msgq_processor
扩展建议
1. 可以添加更复杂的错误处理机制
2. 可以增加队列最大长度限制
3. 可以添加统计功能,如处理消息数量统计
4. 可以根据需要修改消息处理函数 process_message() 的实现
这个实现提供了基本的框架,你可以根据实际需求进行修改和扩展。