当前位置: 首页 > news >正文

redis7.x源码分析:(4) ae事件处理器(一)

ae模块是redis实现的Reactor模型的封装。它的主要代码实现集中在 ae.c 中,另外还提供了平台相关的io多路复用的封装,它们都实现了一套相同的poll接口,就类似于C++中提供了一个接口基类,由针对不同平台的派生类去实现。

// 创建平台相关的io模型实例
static int aeApiCreate(aeEventLoop *eventLoop)
// 修改可侦听的fd数量
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放实例
static void aeApiFree(aeEventLoop *eventLoop)
// 添加或修改fd的侦听事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 删除或修改fd的侦听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 侦听所有fd的事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回使用的io模型名称
static char *aeApiName(void)

实际使用的io模型会根据编译时定义的宏在 ae.c 的代码头部直接引入。

/* Include the best multiplexing layer supported by this system.* The following should be ordered by performances, descending. */
// 根据不同操作系统下不同的宏选择应该使用的io复用模型
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else#ifdef HAVE_EPOLL// 对应linux#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE// 对应BSD(FreeBSD、MacOS等)#include "ae_kqueue.c"#else// 对应Solaris#include "ae_select.c"#endif#endif
#endif

以linux为例先来看一下ae_epoll.c的实现,它的代码也比较简单:

static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 分配events数组大小state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 创建epoll, kernal 2.6.8 之后参数被忽略了state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}anetCloexec(state->epfd);eventLoop->apidata = state;return 0;
}......static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */// 没注册过, 认为是添加fd到epollint op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;// 合并新老事件mask |= eventLoop->events[fd].mask; /* Merge old events */// 设置需要的事件if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning */// 删除指定事件int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;// 设置需要的事件if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;// 没有事件执行删除,否则执行修改if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 此处的 (tvp->tv_usec + 999)/1000 是为了避免当 tvp->tv_usec < 1000us 时转换成毫秒为 0ms, 导致epoll_wait不会等待// 因此这么写之后能保证只要 tvp->tv_usec 不为0,那么至少都能等待 1ms 时长retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;// 保存触发的事件eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}} else if (retval == -1 && errno != EINTR) {panic("aeApiPoll: epoll_wait, %s", strerror(errno));}return numevents;
}

接下来再看一下ae相关接口,先看下fd相关实现。

主要结构体定义如下:

/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 事件类型aeFileProc *rfileProc; // 读事件回调aeFileProc *wfileProc; // 写事件回调void *clientData; // 上下文指针
} aeFileEvent;/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */ // 定时器事件idmonotime when; // 触发时间aeTimeProc *timeProc; // 定时器触发回调aeEventFinalizerProc *finalizerProc; // 定时器销毁时执行的回调void *clientData; // 上下文指针struct aeTimeEvent *prev; // 前一个定时器事件struct aeTimeEvent *next; // 后一个定时器事件int refcount; /* refcount to prevent timer events from being* freed in recursive time event calls. */ // 引用计数
} aeTimeEvent;/* A fired event */
typedef struct aeFiredEvent {int fd; // 触发事件的定时器int mask; // 触发的事件类型
} aeFiredEvent;/* State of an event based program */
typedef struct aeEventLoop {int maxfd;   /* highest file descriptor currently registered */ // 注册的最大fd值int setsize; /* max number of file descriptors tracked */ // 注册的fd数量long long timeEventNextId; // 自增用于生成定时器idaeFileEvent *events; /* Registered events */ // 注册的fd事件aeFiredEvent *fired; /* Fired events */ // 触发的fd事件aeTimeEvent *timeEventHead; // 定时器事件链表头int stop; // 结束标志void *apidata; /* This is used for polling API specific data */ // 不同poll的上下文信息aeBeforeSleepProc *beforesleep; // poll前调用aeBeforeSleepProc *aftersleep; // poll后调用int flags;
} aeEventLoop;

aeEventLoop中的events是用于保存fd注册事件的数组,它是以fd值作为索引来存取事件的。

创建eventloop事件管理器:

aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;// 初始化一下单调时钟使用哪种方式实现monotonicInit();    /* just in case the calling app didn't initialize */if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;// events、fired数组以fd值为下标,直接存取对应fd的相关数据eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;// 根据平台创建io模型实例if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;err:if (eventLoop) {zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;
}

添加和删除事件:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{// 超出events数组有效范围直接报错if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];// 添加或修改fd读写事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd >= eventLoop->setsize) return;aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask & AE_WRITABLE) mask |= AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask);fe->mask = fe->mask & (~mask);// 如果本次删除事件的fd是最大的fd,并且该fd已经没有事件了(可以认为该fd被清除了)if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {/* Update the max fd */int j;// 从后往前遍历找到第一个有事件的fd赋给maxfdfor (j = eventLoop->maxfd-1; j >= 0; j--)if (eventLoop->events[j].mask != AE_NONE) break;eventLoop->maxfd = j;}
}

事件处理函数,处理一轮事件:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want to call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;struct timeval tv, *tvp;int64_t usUntilTimer = -1;// 获取最近的一个定时器距当前时间的触发时长if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))usUntilTimer = usUntilEarliestTimer(eventLoop);if (usUntilTimer >= 0) {tv.tv_sec = usUntilTimer / 1000000;tv.tv_usec = usUntilTimer % 1000000;tvp = &tv;} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}// 不需要等待,超时时间设为0if (eventLoop->flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;}// 执行poll前的回调(对fd读写以及命令处理都会在beforsleep中执行, 如果开启多线程读写也会在它内部一并处理)if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);/* Call the multiplexing API, will return only on timeout or when* some event fires. */// 等待事件numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */// 执行poll后的回调if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);// 处理poll到的读写事件for (j = 0; j < numevents; j++) {int fd = eventLoop->fired[j].fd;aeFileEvent *fe = &eventLoop->events[fd];int mask = eventLoop->fired[j].mask;int fired = 0; /* Number of events fired for current fd. *//* Normally we execute the readable event first, and the writable* event later. This is useful as sometimes we may be able* to serve the reply of a query immediately after processing the* query.** However if AE_BARRIER is set in the mask, our application is* asking us to do the reverse: never fire the writable event* after the readable. In such a case, we invert the calls.* This is useful when, for instance, we want to do things* in the beforeSleep() hook, like fsyncing a file to disk,* before replying to a client. */// 是否反转读写事件的顺序(不设置 AE_BARRIER 时,先读后写)int invert = fe->mask & AE_BARRIER;/* Note the "fe->mask & mask & ..." code: maybe an already* processed event removed an element that fired and we still* didn't processed, so we check if the event is still valid.** Fire the readable event if the call sequence is not* inverted. */if (!invert && fe->mask & mask & AE_READABLE) {// 处理读事件回调fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;// 防止在事件处理函数中 resize 过数组大小,所以需要重新获取下地址fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}/* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {// 处理写事件回调(读写回调函数相同时,每轮poll只执行一次读或者写回调)if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert) {// 反转读写时先写,然后在这边后读(读写回调函数相同时,每轮poll只执行一次读或者写回调)fe = &eventLoop->events[fd]; /* Refresh in case of resize. */if ((fe->mask & mask & AE_READABLE) &&(!fired || fe->wfileProc != fe->rfileProc)){fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* Check time events */// 处理定时器事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

执行主事件循环:

void aeMain(aeEventLoop *eventLoop) {// 启动事件循环eventLoop->stop = 0;while (!eventLoop->stop) {// 处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}

http://www.lryc.cn/news/489638.html

相关文章:

  • 【React】React Router:深入理解前端路由的工作原理
  • 51单片机-独立按键与数码管联动
  • visual studio 2005的MFC各种线程函数之间的调用关系
  • 网页中调用系统的EXE文件,如打开QQ
  • 【单点知识】基于PyTorch讲解自动编码器(Autoencoder)
  • Halo 正式开源: 使用可穿戴设备进行开源健康追踪
  • summernote富文本批量上传音频,视频等附件
  • IDEA如何设置编码格式,字符编码,全局编码和项目编码格式
  • 【计算机网络实验】之静态路由配置
  • 十五届蓝桥杯赛题-c/c++ 大学b组
  • 基础自动化系统的任务
  • DBeaver添加地图查看器的自定义底图
  • STM32F103C8T6实时时钟RTC
  • Python Selenium:Web自动化测试与爬虫开发
  • Java-07 深入浅出 MyBatis - 一对多模型 SqlMapConfig 与 Mapper 详细讲解测试
  • 用CAXA CAD电子图板导入图框、标题栏并导出pdf的方法
  • 深入了解 Linux htop 命令:功能、用法与示例
  • JDK1.8新增特性
  • 环境背景文本到语音转换
  • 后端数据增删改查基于Springboot+mybatis mysql 时间根据当时时间自动填充,数据库连接查询不一致,mysql数据库连接不好用
  • 《Python编程实训快速上手》第九天--调试技巧
  • html5复习一
  • SSL/TLS,SSL,TLS分别是什么
  • css iframe标签使用
  • API的妙用
  • HTML5超酷响应式视频背景动画特效(六种风格,附源码)
  • Spire.PDF for .NET【页面设置】演示:打开 PDF 时自动显示书签或缩略图
  • 算法中常用到的数学知识:埃拉托色尼筛法(获取质数)、欧几里得算法(求两个数最大公因数)
  • 实战OpenCV之人脸识别
  • 图像预处理之图像滤波