当前位置: 首页 > news >正文

利用升序定时器链表处理非活动连接

参考自游双《Linux高性能服务器编程》

背景

服务器同常需要定期处理非活动连接:给客户发一个重连请求,或关闭该连接,或者其他。我们可以通过使用升序定时器链表处理非活动连接,下面的代码利用alarm函数周期性的触发SIGALRM信号,该信号的处理函数利用管道通知主循环执行定时器链表上的定时任务—关闭非活动连接。

实现代码

升序定时器链表

定时器通常包含两个成员:超时时间和任务回调函数。

有时还会包含回调函数被执行时需要传入的参数。

下方代码实现了一个简单的升序定时器链表,按照超时时间做升序排列。

// lst_timer.h
// 升序定时器链表
#ifndef LST_TIMER
#define LST_TIMER#include <time.h>
#define BUFFER_SIZE 64
class util_timer;// 用户数据结构
struct client_data
{sockaddr_in address;   // 客户端socket地址int sockfd;            // socket 文件描述符char buf[BUFFER_SIZE]; // 读缓冲util_timer *timer;     // 链表
};// 定时器类
class util_timer
{
public:util_timer() : prev(NULL), next(NULL) {}public:time_t expire;                  // 任务的超时时间,绝对时间void (*cb_func)(client_data *); // 任务回调函数client_data *user_data;         // 回调函数处理的客户数据,由定时器执行者传递给回调函数util_timer *prev;util_timer *next;
};// 定时器链表,升序,双向,有头尾节点
class sort_timer_lst
{
public:sort_timer_lst() : head(NULL), tail(NULL){};// 删除所有定时器~sort_timer_lst(){util_timer *tmp = head;while (tmp){head = tmp->next;delete tmp;tmp = head;}}// 将定时器timer添加到链表中void add_timer(util_timer *timer){if (!timer){return;}if (!head) // 空链表{head = tail = timer;return;}// 若目标定时器超时时间小于当前链表中所有定时器的超时时间// 则把该定时器插入到头部,作为链表头节点// 否则就要插入合适的位置以保证升序if (timer->expire < head->expire){timer->next = head;head->prev = timer;head = timer;return;}add_timer(timer, head);}// 当某个定时任务发生变化时,调整对应的定时器的超时时间// 这个函数只考虑被调整的定时器的【超时时间的延长情况】,即该定时器要往链表尾部移动void adjust_timer(util_timer *timer){if (!timer){return;}util_timer *tmp = timer->next;// 被调整定时器在链表尾部,或该定时器超时时间仍小于下一个定时器的超时时间,则不用调整if (!tmp || (timer->expire < tmp->expire)){return;}// 若目标定时器时链表头节点,则将该定时器取出重新插入链表if (timer == head){head = head->next;head->prev = NULL;timer->next = NULL;add_timer(timer, head);}// 若目标定时器不是链表头节点,则将该定时器从链表中取出,然后插入原来所在位置之后的部分链表中else{timer->prev->next = timer->next;timer->next->prev = timer->prev;add_timer(timer, timer->next);}}void del_timer(util_timer *timer){if (!timer){return;}// 链表只剩待删除定时器if ((timer == head) && (timer == tail)){delete timer;head = NULL;tail = NULL;return;}if (timer == head){head = head->next;head->prev = NULL;delete timer;return;}if(timer == tail) {tail = tail->prev;tail->next = NULL;delete timer;return;}// 目标定时器位于链表中间timer->prev->next = timer->next;timer->next->prev = timer->prev;delete timer;}// SIGALARM信号每次触发就在其信号处理函数中执行一次tick函数// 来处理链表上到期的任务。void tick(){if(!head){return ;}printf("timer tick\n");time_t cur = time(NULL);util_timer *tmp = head;// 从头开始依次处理每个定时器,直到遇到一个尚未到期的定时器while(tmp){// 未来的时间比现在的时间大if(cur < tmp->expire){break;}tmp->cb_func(tmp->user_data);head = tmp->next;if(head){head->prev = NULL;}delete tmp;tmp = head;}}
private:// 重载的辅助函数// 被add_timer和adjust_timer调用// 功能:将目标定时器timer添加到lst_head之后的部分链表中void add_timer(util_timer *timer, util_timer *lst_head){util_timer *prev = lst_head;util_timer *tmp = prev->next; // 可能插入的位置while(tmp) {if(timer->expire < tmp->expire){prev->next = timer;timer->next = tmp;tmp->prev = timer;timer->prev = prev;break;}prev = tmp;tmp = tmp->next;}if(!tmp){prev->next = timer;timer->prev = prev;timer->next = NULL;tail = timer;}}
private:util_timer *head;util_timer *tail;
};
#endif

处理非活动连接

// 11_3_closeUnactiveConnections.cpp
// 利用alarm函数周期性触发 SIGALRM信号
// 该信号的信号处理函数利用管道通知主循环执行定时器链表上的定时任务即关闭非活动链接
// 一个用户对应一个连接fd、一个定时器检测是否活跃
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <netinet/in.h>
#include <errno.h>
#include <stdlib.h>
#include "lst_timer.h"#define FD_LIMIT 65535  
#define MAX_EVENT_NUMBER 1024
#define TIMESLOT 5static int pipefd[2]; // 管道传输信号
// 利用升序链表管理定时器
static sort_timer_lst timer_lst;
static int epollfd = 0;int setnonblocking(int fd)
{int old_option = fcntl(fd, F_GETFL);int new_option = old_option | O_NONBLOCK;fcntl(fd, F_SETFL, new_option);return old_option;
}void addfd(int epollfd, int fd)
{epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET; // 注册可读事件epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);setnonblocking(fd);
}void sig_handler(int sig)
{int save_errno = errno;int msg = sig;send(pipefd[1], (char *)&msg, 1, 0);errno = save_errno;
}void addsig(int sig)
{struct sigaction sa;memset(&sa, '\0', sizeof(sa));sa.sa_handler = sig_handler;sa.sa_flags |= SA_RESTART;sigfillset(&sa.sa_mask); // 设置所有信号// 为信号注册处理函数assert(sigaction(sig, &sa, NULL) != -1);
}void timer_handler()
{// 定时处理任务,检查有没有到时的定时器,执行其对应任务timer_lst.tick();// 重新定时alarm(TIMESLOT); // 到时会发出SIGALARM信号
}// 定时器回调函数,删除非活动连接socket上的注册事件,并关闭之
void cb_func(client_data *user_data)
{epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0);assert(user_data);close(user_data->sockfd);printf("close fd %d\n", user_data->sockfd);
}int main(int argc, char *argv[])
{if (argc <= 2){printf("usage: %s ip_address port_num\n", basename(argv[0]));return 1;}const char *ip = argv[1];int port = atoi(argv[2]);int ret = 0;struct sockaddr_in addr;bzero(&addr, sizeof(addr));addr.sin_family = AF_INET;inet_pton(AF_INET, ip, &addr.sin_addr);addr.sin_port = htons(port);// 创建TCP socket,并将其绑定到端口port上int listenfd = socket(PF_INET, SOCK_STREAM, 0);assert(listenfd >= 0);// 设置端口复用int opt = 1;setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));ret = bind(listenfd, (struct sockaddr *)&addr, sizeof(addr));assert(ret != -1);ret = listen(listenfd, 5);assert(ret != -1);epoll_event events[MAX_EVENT_NUMBER];int epollfd = epoll_create(5);assert(epollfd != -1);addfd(epollfd, listenfd);// 管道ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);assert(ret != -1);setnonblocking(pipefd[1]); // 设置写端非阻塞addfd(epollfd, pipefd[0]); // 将读端加入epoll树中进行监视// 设置信号处理函数addsig(SIGALRM); // SIGALRM 到来往管道写端发送信号的数值addsig(SIGTERM);bool stop_server = false;client_data *users = new client_data[FD_LIMIT]; // 客户端数组bool timeout = false;alarm(TIMESLOT);while(!stop_server) {int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);if((number < 0) && (errno != EINTR)){printf("epoll failure\n");break;}for(int i = 0; i < number; ++i){int sockfd = events[i].data.fd;if(sockfd == listenfd){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof(client_address);int connfd = accept(listenfd, (sockaddr*)&client_address, &client_addrlength);addfd(epollfd, connfd); // users[connfd].address = client_address;users[connfd].sockfd = connfd;// 创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据// 最后将定时器添加到链表 timer_lst中util_timer *timer = new util_timer;timer->user_data = &users[connfd];timer->cb_func = cb_func;time_t cur = time(NULL);// 设置过期时间,当前时间超过该时间就要回收该定时器绑定的connfdtimer->expire = cur + 3 * TIMESLOT;users[connfd].timer = timer;timer_lst.add_timer(timer);}// 处理信号else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)){int sig;char signals[1024];// 管道读端接受数据// send是在SIGARLRM和SIGTERM信号被触发时,通过sig_handler函数来调用的ret = recv(pipefd[0], signals, sizeof(signals), 0);if(ret == -1){continue; // 处理下一个到来的事件}else if(ret == 0){continue;}else{for(int i = 0; i < ret; ++i){switch(signals[i]){case SIGALRM:{// timeout标志有定时任务要处理// 但不立即处理,因为通常定时任务优先级不高timeout = true;break;}case SIGTERM:{stop_server = true;}}}}}// 处理客户连接上收到的数据else if(events[i].events & EPOLLIN){memset(users[sockfd].buf, BUFFER_SIZE - 1, 0);ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0);printf("get %d bytes of client data %s from %d \n", ret, users[sockfd].buf, sockfd);util_timer *timer = users[sockfd].timer;if(ret < 0){if(errno != EAGAIN){cb_func(&users[sockfd]); // 回收connfdif(timer){timer_lst.del_timer(timer);}}}else if(ret == 0){// 若对方关闭连接,则我们也关闭连接并删除定时器cb_func(&users[sockfd]);if(timer){timer_lst.del_timer(timer);}}else{// 若某个客户的连接上有数据可读// 则要调整对应的定时器的过期时间(通过users数组找到定时器)if(timer){time_t cur = time(NULL);timer->expire = cur + 3 * TIMESLOT;printf("adjust timer once\n");timer_lst.adjust_timer(timer);}else{// other}}}}// 最后处理定时事件,因为通常IO事件有更高的优先级// 但这样导致定时任务不能精确的执行if(timeout){timer_handler(); // 检查是否有到时(太久没有使用)的定时器(对应一个用户的connfd),有就回收fd删除定时器timeout = false;}}close(listenfd);close(pipefd[1]);close(pipefd[2]);delete []users;return 0;
}

测试

目录结构

.
├── 11_3_closeUnactiveConnections.cpp
├── build
├── CMakeLists.txt
└── lst_timer.h

输入编译指令

g++ -o closeConnection 11_3_closeUnactiveConnections.cpp -I ./

也可以使用CMake

cmake_minimum_required (VERSION 2.8)
PROJECT(closeConnection)
# 手动加入文件
SET(SRC_LIST 11_3_closeUnactiveConnections.cpp)#INCLUDE_DIRECTORIES("${CMAKE_CURRENT_SOURCE_DIR}/dir1")
# 相对路径的方式
INCLUDE_DIRECTORIES(.)# 用SRC_LIST所存的名字的源文件来生成可执行文件 darren
ADD_EXECUTABLE(closeConnection ${SRC_LIST} )

执行程序

在本机任意地址的6666端口监听,同一个机器上不同会话使用客户端程序连接服务器

情况1

当客户端连接上服务器后,若socket在三次tick时间里没有IO操作,第四次tick时就回收socket。

服务器

在这里插入图片描述

客户端

在这里插入图片描述

情况2

当客户端连接上服务器后,若socket在三次tick时间里有IO操作,就会续上3次tick的时间( 3 * TIMESLOT)。

如下在第二次tick后,客户端向服务器发送了一条数据 hello

服务器

在这里插入图片描述

客户端

在这里插入图片描述

http://www.lryc.cn/news/3553.html

相关文章:

  • MySQL 开发规范
  • 【C语言进阶】预处理与程序环境
  • 【Docker知识】将环境变量传递到容器
  • Allegro如何更改铜皮显示密度操作指导
  • ThinkPHP5酒店预订管理系统
  • 【MySQL】MyCat分库分表分片规则配置详解与实战(MySQL专栏启动)
  • OpenWrt路由器设置域名动态解析手把手教程
  • java流浪动物救助系统(毕业设计)
  • 阿里代码规范插件中,Apache Beanutils为什么被禁止使用?
  • NFC enable NFC使能流程
  • Redis实例绑定CPU物理核优化Redis性能
  • STC15中断系统介绍
  • 力扣HOT100 11-15
  • 深入浅出单调栈与单调队列
  • 深入C语言——实现可变参数函数
  • 41-Dockerfile-Dockerfile简介
  • 【408】操作系统 - 刻骨铭心自测题1(上)
  • 【老卫拆书】009期:Vue+Node肩挑全栈!《Node.js+Express+MongoDB+Vue.js全栈开发实战》开箱
  • 【LeetCode】动态规划总结
  • CAS详解.
  • Mock.js初步使用(浏览器端)
  • opencv保存图片
  • 【c++】数据类型
  • Elasticsearch的写的底层原理
  • 【网络编程】Java中的Socket
  • 有趣的Hack-A-Sat黑掉卫星挑战赛——跟踪卫星
  • Ubuntu安装配置Cuda和Pytorch gpu
  • 三、Java面向对象
  • pygame7 弹球游戏2
  • 计算机网络4:计算机网络体系结构