redis7.x源码分析:(4) ae事件处理器(一)
ae模块是redis实现的Reactor模型的封装。它的主要代码实现集中在 ae.c 中,另外还提供了平台相关的io多路复用的封装,它们都实现了一套相同的poll接口,就类似于C++中提供了一个接口基类,由针对不同平台的派生类去实现。
// 创建平台相关的io模型实例
static int aeApiCreate(aeEventLoop *eventLoop)
// 修改可侦听的fd数量
static int aeApiResize(aeEventLoop *eventLoop, int setsize)
// 释放实例
static void aeApiFree(aeEventLoop *eventLoop)
// 添加或修改fd的侦听事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 删除或修改fd的侦听事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 侦听所有fd的事件
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回使用的io模型名称
static char *aeApiName(void)
实际使用的io模型会根据编译时定义的宏在 ae.c 的代码头部直接引入。
/* Include the best multiplexing layer supported by this system.* The following should be ordered by performances, descending. */
// 根据不同操作系统下不同的宏选择应该使用的io复用模型
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else#ifdef HAVE_EPOLL// 对应linux#include "ae_epoll.c"#else#ifdef HAVE_KQUEUE// 对应BSD(FreeBSD、MacOS等)#include "ae_kqueue.c"#else// 对应Solaris#include "ae_select.c"#endif#endif
#endif
以linux为例先来看一下ae_epoll.c的实现,它的代码也比较简单:
static int aeApiCreate(aeEventLoop *eventLoop) {aeApiState *state = zmalloc(sizeof(aeApiState));if (!state) return -1;// 分配events数组大小state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);if (!state->events) {zfree(state);return -1;}// 创建epoll, kernal 2.6.8 之后参数被忽略了state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */if (state->epfd == -1) {zfree(state->events);zfree(state);return -1;}anetCloexec(state->epfd);eventLoop->apidata = state;return 0;
}......static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning *//* If the fd was already monitored for some event, we need a MOD* operation. Otherwise we need an ADD operation. */// 没注册过, 认为是添加fd到epollint op = eventLoop->events[fd].mask == AE_NONE ?EPOLL_CTL_ADD : EPOLL_CTL_MOD;ee.events = 0;// 合并新老事件mask |= eventLoop->events[fd].mask; /* Merge old events */// 设置需要的事件if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;return 0;
}static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {aeApiState *state = eventLoop->apidata;struct epoll_event ee = {0}; /* avoid valgrind warning */// 删除指定事件int mask = eventLoop->events[fd].mask & (~delmask);ee.events = 0;// 设置需要的事件if (mask & AE_READABLE) ee.events |= EPOLLIN;if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;ee.data.fd = fd;// 没有事件执行删除,否则执行修改if (mask != AE_NONE) {epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);} else {/* Note, Kernel < 2.6.9 requires a non null event pointer even for* EPOLL_CTL_DEL. */epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);}
}static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {aeApiState *state = eventLoop->apidata;int retval, numevents = 0;// 此处的 (tvp->tv_usec + 999)/1000 是为了避免当 tvp->tv_usec < 1000us 时转换成毫秒为 0ms, 导致epoll_wait不会等待// 因此这么写之后能保证只要 tvp->tv_usec 不为0,那么至少都能等待 1ms 时长retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);if (retval > 0) {int j;numevents = retval;for (j = 0; j < numevents; j++) {int mask = 0;struct epoll_event *e = state->events+j;if (e->events & EPOLLIN) mask |= AE_READABLE;if (e->events & EPOLLOUT) mask |= AE_WRITABLE;if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;// 保存触发的事件eventLoop->fired[j].fd = e->data.fd;eventLoop->fired[j].mask = mask;}} else if (retval == -1 && errno != EINTR) {panic("aeApiPoll: epoll_wait, %s", strerror(errno));}return numevents;
}
接下来再看一下ae相关接口,先看下fd相关实现。
主要结构体定义如下:
/* File event structure */
typedef struct aeFileEvent {int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 事件类型aeFileProc *rfileProc; // 读事件回调aeFileProc *wfileProc; // 写事件回调void *clientData; // 上下文指针
} aeFileEvent;/* Time event structure */
typedef struct aeTimeEvent {long long id; /* time event identifier. */ // 定时器事件idmonotime when; // 触发时间aeTimeProc *timeProc; // 定时器触发回调aeEventFinalizerProc *finalizerProc; // 定时器销毁时执行的回调void *clientData; // 上下文指针struct aeTimeEvent *prev; // 前一个定时器事件struct aeTimeEvent *next; // 后一个定时器事件int refcount; /* refcount to prevent timer events from being* freed in recursive time event calls. */ // 引用计数
} aeTimeEvent;/* A fired event */
typedef struct aeFiredEvent {int fd; // 触发事件的定时器int mask; // 触发的事件类型
} aeFiredEvent;/* State of an event based program */
typedef struct aeEventLoop {int maxfd; /* highest file descriptor currently registered */ // 注册的最大fd值int setsize; /* max number of file descriptors tracked */ // 注册的fd数量long long timeEventNextId; // 自增用于生成定时器idaeFileEvent *events; /* Registered events */ // 注册的fd事件aeFiredEvent *fired; /* Fired events */ // 触发的fd事件aeTimeEvent *timeEventHead; // 定时器事件链表头int stop; // 结束标志void *apidata; /* This is used for polling API specific data */ // 不同poll的上下文信息aeBeforeSleepProc *beforesleep; // poll前调用aeBeforeSleepProc *aftersleep; // poll后调用int flags;
} aeEventLoop;
aeEventLoop中的events是用于保存fd注册事件的数组,它是以fd值作为索引来存取事件的。
创建eventloop事件管理器:
aeEventLoop *aeCreateEventLoop(int setsize) {aeEventLoop *eventLoop;int i;// 初始化一下单调时钟使用哪种方式实现monotonicInit(); /* just in case the calling app didn't initialize */if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;// events、fired数组以fd值为下标,直接存取对应fd的相关数据eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;eventLoop->setsize = setsize;eventLoop->timeEventHead = NULL;eventLoop->timeEventNextId = 0;eventLoop->stop = 0;eventLoop->maxfd = -1;eventLoop->beforesleep = NULL;eventLoop->aftersleep = NULL;eventLoop->flags = 0;// 根据平台创建io模型实例if (aeApiCreate(eventLoop) == -1) goto err;/* Events with mask == AE_NONE are not set. So let's initialize the* vector with it. */for (i = 0; i < setsize; i++)eventLoop->events[i].mask = AE_NONE;return eventLoop;err:if (eventLoop) {zfree(eventLoop->events);zfree(eventLoop->fired);zfree(eventLoop);}return NULL;
}
添加和删除事件:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData)
{// 超出events数组有效范围直接报错if (fd >= eventLoop->setsize) {errno = ERANGE;return AE_ERR;}aeFileEvent *fe = &eventLoop->events[fd];// 添加或修改fd读写事件if (aeApiAddEvent(eventLoop, fd, mask) == -1)return AE_ERR;fe->mask |= mask;if (mask & AE_READABLE) fe->rfileProc = proc;if (mask & AE_WRITABLE) fe->wfileProc = proc;fe->clientData = clientData;if (fd > eventLoop->maxfd)eventLoop->maxfd = fd;return AE_OK;
}void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
{if (fd >= eventLoop->setsize) return;aeFileEvent *fe = &eventLoop->events[fd];if (fe->mask == AE_NONE) return;/* We want to always remove AE_BARRIER if set when AE_WRITABLE* is removed. */if (mask & AE_WRITABLE) mask |= AE_BARRIER;aeApiDelEvent(eventLoop, fd, mask);fe->mask = fe->mask & (~mask);// 如果本次删除事件的fd是最大的fd,并且该fd已经没有事件了(可以认为该fd被清除了)if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {/* Update the max fd */int j;// 从后往前遍历找到第一个有事件的fd赋给maxfdfor (j = eventLoop->maxfd-1; j >= 0; j--)if (eventLoop->events[j].mask != AE_NONE) break;eventLoop->maxfd = j;}
}
事件处理函数,处理一轮事件:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want to call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;struct timeval tv, *tvp;int64_t usUntilTimer = -1;// 获取最近的一个定时器距当前时间的触发时长if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))usUntilTimer = usUntilEarliestTimer(eventLoop);if (usUntilTimer >= 0) {tv.tv_sec = usUntilTimer / 1000000;tv.tv_usec = usUntilTimer % 1000000;tvp = &tv;} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}// 不需要等待,超时时间设为0if (eventLoop->flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;}// 执行poll前的回调(对fd读写以及命令处理都会在beforsleep中执行, 如果开启多线程读写也会在它内部一并处理)if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);/* Call the multiplexing API, will return only on timeout or when* some event fires. */// 等待事件numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */// 执行poll后的回调if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);// 处理poll到的读写事件for (j = 0; j < numevents; j++) {int fd = eventLoop->fired[j].fd;aeFileEvent *fe = &eventLoop->events[fd];int mask = eventLoop->fired[j].mask;int fired = 0; /* Number of events fired for current fd. *//* Normally we execute the readable event first, and the writable* event later. This is useful as sometimes we may be able* to serve the reply of a query immediately after processing the* query.** However if AE_BARRIER is set in the mask, our application is* asking us to do the reverse: never fire the writable event* after the readable. In such a case, we invert the calls.* This is useful when, for instance, we want to do things* in the beforeSleep() hook, like fsyncing a file to disk,* before replying to a client. */// 是否反转读写事件的顺序(不设置 AE_BARRIER 时,先读后写)int invert = fe->mask & AE_BARRIER;/* Note the "fe->mask & mask & ..." code: maybe an already* processed event removed an element that fired and we still* didn't processed, so we check if the event is still valid.** Fire the readable event if the call sequence is not* inverted. */if (!invert && fe->mask & mask & AE_READABLE) {// 处理读事件回调fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;// 防止在事件处理函数中 resize 过数组大小,所以需要重新获取下地址fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}/* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {// 处理写事件回调(读写回调函数相同时,每轮poll只执行一次读或者写回调)if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert) {// 反转读写时先写,然后在这边后读(读写回调函数相同时,每轮poll只执行一次读或者写回调)fe = &eventLoop->events[fd]; /* Refresh in case of resize. */if ((fe->mask & mask & AE_READABLE) &&(!fired || fe->wfileProc != fe->rfileProc)){fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* Check time events */// 处理定时器事件if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}
执行主事件循环:
void aeMain(aeEventLoop *eventLoop) {// 启动事件循环eventLoop->stop = 0;while (!eventLoop->stop) {// 处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}