会议通信系统核心流程详解(底稿1)
会议通信系统核心流程详解
系统架构概览
一、创建会议室流程
1. 整体流程
2. 进程/线程交互
组件 | 行为 |
---|---|
主进程 | 1. 接收客户端请求 2. 查找空闲房间进程 3. 通过UNIX套接字传递客户端连接 |
房间进程 | 1. accept_fd 线程接收新连接2. 设置房主身份 3. 发送线程返回会议室ID |
客户端 | 等待会议室创建响应 |
3. 关键代码逻辑
// 主进程分配房间
void dowithuser(int connfd) {if (msgtype == CREATE_MEETING) {// 查找空闲房间for (i = 0; i < nprocesses; i++) {if (room->pptr[i].child_status == 0) break;}// 传递连接给房间进程char cmd = 'C';write_fd(room->pptr[i].child_pipefd, &cmd, 1, connfd);}
}// 房间进程接收连接
void* accept_fd(void *arg) {read_fd(pipefd, &cmd, 1, &client_fd);if (cmd == 'C') {// 添加为新房主FD_SET(client_fd, &user_pool->fdset);user_pool->owner = client_fd;// 发送会议室ID(PID)给客户端MSG msg;msg.msgType = CREATE_MEETING_RESPONSE;int roomNo = getpid();msg.ptr = (char*)&roomNo;sendqueue.push_msg(msg);}
}
二、加入会议室流程
1. 整体流程
2. 进程/线程交互
组件 | 行为 |
---|---|
主进程 | 1. 解析会议室ID 2. 找到对应房间进程 3. 传递新客户端连接 |
房间进程 | 1. accept_fd 线程接收连接2. 验证会议室状态 3. 发送线程广播通知 |
新客户端 | 接收加入结果和用户列表 |
现有客户端 | 接收新用户加入通知 |
3. 关键代码逻辑
// 房间进程处理加入请求
void* accept_fd(void *arg) {if (cmd == 'J') {// 添加到用户池FD_SET(client_fd, &user_pool->fdset);user_pool->num++;// 广播新用户加入MSG join_msg;join_msg.msgType = PARTNER_JOIN;join_msg.ip = getpeerip(client_fd);sendqueue.push_msg(join_msg);// 发送现有用户列表MSG list_msg;list_msg.msgType = PARTNER_JOIN2;// 收集所有用户IP...sendqueue.push_msg(list_msg);}
}
三、消息收发与转发流程
1. 整体流程
2. 进程/线程交互
组件 | 行为 |
---|---|
发送客户端 | 1. 构造消息包 2. 发送到房间进程 |
房间进程主线程 | 1. 接收原始消息 2. 解析消息头 3. 存入发送队列 |
发送线程池 | 1. 从队列取消息 2. 转换消息类型 3. 广播给其他用户 |
接收客户端 | 接收转换后的消息 |
3. 消息协议格式
$[2字节类型][4字节IP][4字节长度]数据#
示例:文字消息 “Hello” 的发送包:
$0x0001 192.168.1.100 0005 Hello#
4. 关键代码逻辑
// 接收消息
void process_main(...) {if (FD_ISSET(fd, &rset)) {Readn(fd, head, 11); // 读消息头if (msgtype == TEXT_SEND) {// 读取完整数据Readn(fd, data, datalen);// 存入队列(转换为TEXT_RECV)MSG msg;msg.msgType = TEXT_RECV;msg.ptr = data;sendqueue.push_msg(msg);}}
}// 发送线程
void* send_func(void *arg) {MSG msg = sendqueue.pop_msg();// 构建转发包头buf = "$" + 消息类型 + 发送者IP + 数据长度 + 数据 + "#";// 广播给除发送者外的所有用户for (int i = 0; i <= maxfd; i++) {if (user_pool->status[i] == ON && i != msg.targetfd) {writen(i, buf, len);}}
}
四、进程/线程通信关系
1. 整体通信架构
2. 通信方式详解
通信路径 | 技术 | 数据 |
---|---|---|
主进程 ↔ 房间进程 | UNIX域套接字 | 命令字(‘C’,‘J’,‘E’,‘Q’) + 文件描述符 |
房间进程内部线程 | 消息队列 | MSG结构体(类型/数据/目标) |
房间进程 ↔ 客户端 | TCP套接字 | 结构化消息包 |
3. 消息队列实现
class SEND_QUEUE {pthread_mutex_t lock;pthread_cond_t cond;queue<MSG> send_queue;public:void push_msg(MSG msg) {lock();while (queue.full()) wait(cond);send_queue.push(msg);signal(cond);unlock();}MSG pop_msg() {lock();while (queue.empty()) wait(cond);MSG msg = send_queue.front();send_queue.pop();signal(cond);unlock();return msg;}
};
五、异常处理机制
1. 用户掉线处理
2. 关键代码
void fdclose(int fd, int pipefd) {if (user_pool->owner == fd) {// 房主退出user_pool->clear_room();char cmd = 'E';writen(pipefd, &cmd, 1);} else {// 普通用户退出char cmd = 'Q';writen(pipefd, &cmd, 1);// 广播退出通知MSG msg;msg.msgType = PARTNER_EXIT;msg.ip = user_pool->fdToIp[fd];sendqueue.push_msg(msg);}
}
六、性能优化设计
-
线程池处理发送:
// 每个房间进程创建5个发送线程 for (int i = 0; i < SENDTHREADSIZE; i++) {Pthread_create(&tid, NULL, send_func, NULL); }
-
零拷贝发送:
struct iovec iov[2]; iov[0].iov_base = header; iov[0].iov_len = header_len; iov[1].iov_base = payload; iov[1].iov_len = payload_len; writev(fd, iov, 2);
-
批处理状态更新:
// 主进程批量处理状态更新 for (int i = 0; i < nprocesses; i++) {if (FD_ISSET(room->pptr[i].child_pipefd, &rset)) {char cmd;Readn(pipefd, &cmd, 1);// 处理命令} }
总结
本系统通过多进程+多线程架构实现了高效会议通信:
- 进程隔离:每个会议室独立进程,故障隔离
- 线程协作:接收/发送线程分离,避免阻塞
- 高效通信:UNIX域套接字+消息队列减少拷贝
- 负载均衡:进程池动态分配会议室资源
这种架构平衡了性能与稳定性,是实时通信系统的经典实现方案。