reactor模式学习
1.什么是reactor模式
2.reactor模式解决的问题是什么
3.reactor模式的实现方式有哪些
一. 什么是reactor模式
讲这个之前,先整体上看看,一般的服务端架构设计都是怎么做的。
(1) 经典的Thread-Based Architecture(线程模式)。
线程模式也有多种做法:
① accept read send等I/O操作,以及数据解析,业务处理等具体逻辑都在一个线程中完成。属于最基本的网络服务流程,要求是客户端连接是一次性的,不涉及来回交互(类似UDP通信),并且读写的数据量很少,业务逻辑很简单,处理速度很快。如此才能保证多客户端的及时响应,通常用于UDP循环服务器模型。(循环服务器模型指的是:服务器在处理一个客户端时不接受其他客户端的连接。并发服务器模型指的是: 服务器可以处理客户端并发访问。本篇笔记主要讨论并发服务器模型)
② accept 放在主线程中,一旦有客户端连接,则将连接下放到任务队列,后面跟一个线程池,不断从队列中取走连接任务,在子线程中执行read->parse->process->send流程。
优点: 简单直观,适合并发量不是很大的场景。
缺点:连接和线程始终保持一一对应的关系。如果客户端很多,并且都是长连接的话,会使大量线程一直是keep-Alive状态,并在空闲状态下等待,浪费大量的内存和堆栈空间;此外一个进程能够开启的线程数量是有限的,意味着任务队列也不能太大(太大了没有意义,即使放入了队列,也没办法被被及时处理),虽然通过多进程可以缓解(每个进程都有一个accpet,和一个线程池),但是治标不治本。
(2) Event-driven architecture (事件驱动模式)
将I/O的各种状态的变化封装成为事件,并使用I/O多路复用技术,实现单个线程处理多个客户端的I/O事件(可读/可写事件)。(复用,我的理解就是多条连接共享同一个线程)。需要注意的是,parse->process这两步是具体的业务逻辑处理,不会被放在这个I/O线程内部,一般是需要和I/O线程解耦的,可以通过额外任务队列和工作线程池(workers pool)实现。
reactor是事件驱动模式的一种具体参考指导(类似于指导规范),它在很多网络通信库中都有各自具体的实现,如ACE, libevent,Mudo,ZeroMQ以及java中的netty等。另外,redis服务端也实现了reactor模式。为了进一步提高并发性和吞吐量,reactor有时也被设计成多I/O线程,即主I/O线程只管accept,网络I/O的读写操作放到子I/O线程中去。(最新的redis6.x已经采用了这种方式,不再是单I/O线程了,不过它对内存数据库的读和写(即命令的执行)依然还是放在主I/O线程处理,子I/O线程只是负责处理客户端命令的读取/解析以及命令执行结果的返回,所以数据库中的数据始终是线程安全的)。
二. reactor模式解决的问题
根据上面的总结可知,reactor模式(或者说事件驱动模式)主要解决的就是多客户端高并发访问时,网络服务端如何设计才能依然保持稳健运行的问题。
三. reactor 的实现方法
实际编写正确的reactor代码是比较繁琐的,经过搜索发现网上有示例,遂搬运,修改,调试之,增加了一些自己的理解,修改了reactor事件集的结构以满足同时监听客户端读写事件的需求。原博文地址:https://blog.csdn.net/qq_41453285/article/details/103001772。客户端的压力测试可以用原博文提供的程序,也可以自己手动写一个简单的测试程序。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/epoll.h>
#include <signal.h>
#include <errno.h>
#include <time.h>
#include <libgen.h>
#include <fcntl.h>#define MAX_EPOLL_EVENTS 1024
#define MAX_BUFFER_SIZE 4096typedef int (*NCALLBACK)(int, int, void*);struct Revent {int fd; // 事件对应的fdint events; // 事件类型( 本代码中我们只处理EPOLL_IN和EPOLL_OUT)void *arg; // 实际传入的是一个struct Reactor结构体指针int (*callback)(int fd, int evtype,void *arg); //事件回调函数int status; // 当前事件是否在epoll集合中: 1表示在, 0表示不在char buffer[MAX_BUFFER_SIZE]; // 读写缓冲区int length; //缓冲区数据的长度long last_active; // 最后一次活跃的时间struct Revent* prev;struct Revent* next;
};// Reactor主体
struct Reactor {//epoll 的fdint epoll_fd; // reactor事件集,是二维结构,因为每个fd可能有多个事件// 第一维下标即为对应的文件描述符, 共有MAX_EPOLL_EVENTS-5个可以用来存放客户端的fd// 因为剩余的5个都有其他用处了。0: 标准输入 1: 标准输出 2:标准错误 3:监听socket// 4:epool专用fdstruct Revent *events[MAX_EPOLL_EVENTS];
};// 创建一个Tcp Server
int init_server(char *ip, short port);
// 向reactor中添加一个服务器监听事件
int reactor_addlistener(struct Reactor *reactor, int fd, NCALLBACK callback);/***下面这3个函数是用来对reactor操作的***/
struct Reactor *reactor_init();
int reactor_destroy(struct Reactor *reactor);
int reactor_run(struct Reactor *reactor);/***下面这3个函数是用来对Revent事件结构操作的***/
int revent_set(struct Revent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);
int revent_add(int epoll_fd, struct Revent* ev);
int revent_del(int epoll_fd, struct Revent* event);/***下面这3个函数是Revent事件可以使用的回调函数***/
// fd: 事件对应的文件描述符 events: 事件的类型(EPOLLIN,EPOLLOUT,...组合) arg: 事件自身句柄
int accept_callback(int fd, int events, void *arg);
int recv_callback(int fd, int events, void *arg);
int send_callback(int fd, int events, void *arg);int init_server(char *ip, short port)
{// 1.创建套接字int sock_fd = socket(AF_INET, SOCK_STREAM, 0);if(sock_fd == -1){printf("Error in %s(), socket: %s\n", __func__, strerror(errno));return -1;}// 2.初始化服务器地址struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1){printf("Error in %s(), inet_pton: %s\n", __func__, strerror(errno));return -1;}server_addr.sin_port = htons(port);// 3.绑定服务器地址if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1){printf("Error in %s(), bind: %s\n", __func__, strerror(errno));return -1;}// 3.监听if(listen(sock_fd, 20) == -1){printf("Error in %s(), listen: %s\n", __func__, strerror(errno));return -1;}printf("Listen start [%s:%d]...\n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));return sock_fd;
}struct Reactor *reactor_init()
{// 1.创建一个reactorstruct Reactor *reactor = (struct Reactor*)malloc(sizeof(struct Reactor));if(reactor == NULL)return NULL;memset(reactor, 0, sizeof(struct Reactor));// 2.创建reacotr的epoll_fdreactor->epoll_fd = epoll_create(1);if(reactor->epoll_fd == -1){printf("Error in %s(), epoll_create: %s\n", __func__, strerror(errno));free(reactor);return NULL;}return reactor;
}int reactor_destroy(struct Reactor *reactor)
{if(reactor == NULL){printf("Error in %s(): %s\n", __func__, "reactor arg is NULL");return -1;}// 关闭epoll_fd、销毁事件集、释放结构close(reactor->epoll_fd);free(reactor);return 0;
}int reactor_run(struct Reactor *reactor)
{// 1.判断参数if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL){printf("Error in %s(): %s\n", __func__, "reactor arg is error");return -1;}struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];// 2.进行epoll_wait()int nready;while(1){nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);// 3.函数出错if(nready == -1){// 如果epoll_wait函数在阻塞过程中接收到外部信号, 那么继续进行epoll_wait()if(errno == EAGAIN || errno == EWOULDBLOCK)continue;printf("Error in %s(), epoll_wait: %s\n", __func__, strerror(errno));return -1;}// 4.epoll_wait函数超时else if(nready == 0)continue;// 5.有事件准备好else{// 遍历处理已就绪的事件for(int i = 0; i < nready; ++i){struct Revent* ev = (struct Revent*)ep_events[i].data.ptr;int res;int fd = ev->fd;// 如果是可读事件if((ep_events[i].events & EPOLLIN)){struct Revent* evhead = reactor->events[fd];for(ev=evhead;ev!=NULL;ev=ev->next)if(ev->events&EPOLLIN)break;if(ev!=NULL){printf("readable events prepare to handling.\n");res = ev->callback(ev->fd,ev->events, ev);//ev在callback内部可能被释放,因此调用之后不要再访问它了printf("readable result: %d\n",res);}}// 如果是可写事件if((ep_events[i].events & EPOLLOUT)){struct Revent* evhead = reactor->events[fd];for(ev=evhead;ev!=NULL;ev=ev->next)if(ev->events&EPOLLOUT)break;if(ev!=NULL){printf("writable events prepare to handling.\n");res = ev->callback(ev->fd, ev->events,ev);printf("writable result: %d\n",res);//同理,不应再访问ev}}}//end for}}//end whilereturn 0;
}int reactor_addlistener(struct Reactor *reactor, int fd, NCALLBACK callback)
{if(reactor == NULL || fd <0 || callback == NULL){printf("Error in %s(): %s\n", __func__, "arg error");return -1;}// 初始化监听socket的可读事件, 并将该事件添加到reactor的事件集合中struct Revent *ev = (struct Revent*)malloc(sizeof(struct Revent));memset(ev,0,sizeof(struct Revent));revent_set(ev, fd, EPOLLIN, 0, 0, callback, reactor);printf("listening fd=%d\n",fd);int ret = revent_add(reactor->epoll_fd, ev);if(ret<0){free(ev);printf("warning add listenner read event falided.\n");}return 0;
}int revent_set(struct Revent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)
{if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0){printf("Error in %s(): %s\n", __func__, "arg error");return -1;}// 初始化Revent结构的相关内容即可ev->fd = fd;ev->events = event;ev->arg = arg;ev->callback = callback;ev->status = status;ev->length = length;ev->last_active = time(NULL);ev->next = NULL;ev->prev = NULL;return 0;
}int revent_add(int epoll_fd, struct Revent* ev)
{if(ev == NULL)return -1;struct Reactor *reactor=(struct Reactor *)ev->arg;if(reactor == NULL || reactor->epoll_fd <0||ev->status!=0)return -1;// 0.将事件加入reactor的事件集合int fd = ev->fd;struct Revent *head = reactor->events[fd];if(head==NULL){reactor->events[fd] = ev;}else{head->prev = ev;ev->next = head;ev->prev = NULL;reactor->events[fd] = ev;}ev->status=1;// 1.将事件注册到epoll事件集合中struct epoll_event ep_event;memset(&ep_event, 0, sizeof(ep_event));ep_event.events = ev->events;ep_event.data.ptr = ev;//ep_event.data.fd = ev->fd; data成员是一个联合体, 不能同时使用fd和ptr成员// 如果当前ev已经在epoll事件表中, 就修改; 否则就把ev新加入到epoll事件表中int op=EPOLL_CTL_ADD;int evtype = 0;int res = epoll_ctl(epoll_fd, op, fd, &ep_event);if( res != 0){if(errno==EEXIST){//printf("this fd is already in epoll set\n");op = EPOLL_CTL_MOD;memset(&ep_event, 0, sizeof(ep_event));ep_event.data.ptr = ev;for(struct Revent *e=ev; e!=NULL; e=e->next)ep_event.events |= e->events;res = epoll_ctl(epoll_fd, op, fd, &ep_event);}}if(res !=0){reactor->events[fd]=reactor->events[fd]->next;ev->status=0;printf("update event for fd=%d falided, epoll_ctl: %s, operator type: %d, error:%d\n",fd, strerror(errno),op,errno);return -1;}printf("update event type for fd=%d success. with type is: %d\n",fd,ep_event.events);return 0;
}int revent_del(int epoll_fd, struct Revent* ev)
{if(ev == NULL)return -1;printf("prepare to del event from reactor events set\n");struct Reactor *reactor=(struct Reactor *)ev->arg;if(reactor == NULL || reactor->epoll_fd <0 ||ev->status != 1){printf("Error in %s(), ev->status=%d\n", __func__, ev->status);return -1;}//把ev从reactor中取出int fd = ev->fd;if(ev == reactor->events[fd])reactor->events[fd] = ev->next;if(ev->prev!=NULL)ev->prev->next = ev->next;if(ev->next!=NULL)ev->next->prev = ev->prev;ev->status = 0;struct epoll_event ep_event;memset(&ep_event, 0, sizeof(ep_event));ep_event.data.ptr = ev;struct Revent* ev_this_fd = reactor->events[fd];int EVENTSTYPE=0;for(;ev_this_fd!=NULL;ev_this_fd=ev_this_fd->next)EVENTSTYPE |= ev_this_fd->events;int op;if(EVENTSTYPE != 0){memset(&ep_event, 0, sizeof(ep_event));ep_event.data.ptr=reactor->events[fd];ep_event.events = EVENTSTYPE;op=EPOLL_CTL_MOD; }else{op=EPOLL_CTL_DEL;} if(epoll_ctl(epoll_fd, op , fd, &ep_event) == -1){printf("Error in %s(), epoll_ctl: %s\n", __func__, strerror(errno));return -1;}return 0;
}int accept_callback(int fd, int what, void* ntyev)
{// 1.获取该fd对应的事件结构struct Revent *ev = (struct Revent*)ntyev;// 2.获得reactor结构struct Reactor *reactor =(struct Reactor*)(ev->arg);// 3.初始化客户端地址结构struct sockaddr_in cli_addr;memset(&cli_addr, 0 , sizeof(cli_addr));socklen_t len = sizeof(cli_addr);// 4.接收客户端int cli_fd;cli_fd = accept(fd, (struct sockaddr*)&cli_addr, &len);if(cli_fd == -1){//fd耗尽了,太多的客户端同时在线,还没有释放。//所以服务端增加超时机制及时关掉超时的客户端,回收fd是尤其必要的if(errno==EMFILE)printf("Error in %s(), accept: %s\n", __func__, strerror(errno));return -1;}// fd 的0、1、2、3、4 都被占用了// 0: 标准输入 1: 标准输出 2:标准错误 3:监听socket 4:epool专用fd// 5.将套接字设置为非阻塞fcntl(cli_fd, F_SETFL, O_NONBLOCK);// setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len)// 如果设置了SO_SNDTIMEO超时, 即便是阻塞的套接字,在recv 和 accept 的时候也会非阻塞// 6.将新事件添加到reactor事件表中struct Revent *cliev_read = (struct Revent*)malloc(sizeof(struct Revent));memset(cliev_read,0,sizeof(struct Revent));revent_set(cliev_read, cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);int ret = revent_add(reactor->epoll_fd, cliev_read);if(ret<0){free(cliev_read);close(cli_fd);printf("Add new client reading event falided.\n");return -1;}printf("New connect: [%s:%d] [client fd=%d] [evtype:%d] [time:%ld]\n", \inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port),cli_fd,cliev_read->events,cliev_read->last_active);return cli_fd;
}int recv_callback(int fd, int what, void* ntyev)
{// 1.获取该fd对应的事件结构struct Revent *ev = (struct Revent*)ntyev;// 2.获得reactor结构struct Reactor *reactor =(struct Reactor*)(ev->arg);// 3.接收数据int rc = recv(fd, ev->buffer, MAX_BUFFER_SIZE, 0);if(rc <= 0)//recv出错{// EAGAIN 表示读缓冲区暂时没有可读的数据if(errno == EAGAIN || errno == EWOULDBLOCK)return rc;//ECONNRESET 表示收到了对端内核发送的RST信号,表明对端socket已//经关闭(网络不通的话,将收不到该信号)if(errno==ECONNRESET)printf("Counter part socket has been closed.\n");printf("Error in %s(), %s\n", __func__, strerror(errno));// 把该fd相关的事件全部移除for(struct Revent *tmpev = reactor->events[fd];tmpev!=NULL;){revent_del(reactor->epoll_fd, tmpev);free(tmpev);tmpev = reactor->events[fd];}//close调用时,如果该fd的读缓冲区还剩有数据没取完,将会向对方发送RST包而不是FIN包close(fd);printf("[Close fd=%d]\n",fd);reactor->events[fd]=NULL;} else{ev->buffer[rc] = '\0';printf("Recv[fd = %d]: %s\n", fd, ev->buffer);// 这里应该是解析接收到的数据,并进行处理,根据需求决定是否要写回数据。// 考虑到直接调用send有可能不成功,比如当前时刻写缓冲区满了,不能确定何// 时有空闲,所以也要把可写事件也注册到reactorstruct Revent *ev_write = (struct Revent*)malloc(sizeof(struct Revent));memset(ev_write,0,sizeof(struct Revent));strcpy(ev_write->buffer,ev->buffer);//将收到的数据再发送回去revent_set(ev_write, fd, EPOLLOUT, rc, 0, send_callback, reactor);int ret = revent_add(reactor->epoll_fd, ev_write);if(ret<0){printf("Add client writable event falided.\n");free(ev_write);}}return rc;
}int send_callback(int fd, int what, void* ntyev)
{ // 1.获取该fd对应的事件结构struct Revent *ev = (struct Revent*)ntyev;// 2.获得reactor结构struct Reactor *reactor =(struct Reactor*)(ev->arg);// 3.向发送缓冲区中写入数据int rc = send(ev->fd, ev->buffer, ev->length, 0);//对方可能回复RST// 有可能要发送的内容太多,不能一次性放入发送缓冲区,可以循环发送// 过程中要注意发送缓冲区中待发数据的长度变化,如果长度线性增加且,//说明对方接收异常,可以提前终止,不必等发送缓冲区满填满。/*int value;//socket发送队列中等待发送的数据长度unsigned int sendBufferLen;//socket 发送队列总长度,固定不变size_t optlen = sizeof(sendBufferLen);getsockopt(ev->fd, SOL_SOCKET, SO_SNDBUF, &sendBufferLen, &optlen);ioctl(ev->fd,TIOCOUTQ,&value);//如果发送缓冲区剩余空间不足,调用send会失败while(sendBufferLen-value>ev->length&& rc>0 ){rc = send(ev->fd, ev->buffer, ev->length, 0);ioctl(ev->fd,TIOCOUTQ,&value);//如果数据发完了,可以break出来}*/if(rc > 0) //写入缓冲区成功,且当前时刻还未检测到对方socket异常{printf("Send[fd = %d]: %s\n", ev->fd, ev->buffer);// 如果数据发完,移除可写事件, 避免水平触发方式下的重复触发写事件revent_del(reactor->epoll_fd, ev);free(ev);}else //send写入缓冲区失败(rc<0),检测到对方socket异常{printf("Error in %s(), %s\n", __func__, strerror(errno));if(errno==EPIPE)//处理可以预见的错误,其他的错误暂不处理。EPIPE表示对方异常关闭,等价于recv时收到ECONNRESET信号{printf("EPIPE. Counter part disconnected Unusually.\n");for(struct Revent *tmpev = reactor->events[fd];tmpev!=NULL;){revent_del(reactor->epoll_fd, tmpev);free(tmpev);tmpev = reactor->events[fd];}}else{revent_del(reactor->epoll_fd, ev);free(ev);}}//调用close时如果写缓冲区还有数据没发完,则根据该fd的SO_LINGER策略决定是直接丢弃还是尝试发送直到//SO_LINGER策略超时,超时和直接丢弃都会向对方发送RST报文。if(reactor->events[fd]==NULL) {close(fd);printf("[Close fd=%d]\n",fd);}return rc;
}int main(int argc, char *argv[])
{if(argc != 3){printf("usage: ./%s [ip] [port]\n", basename(argv[0]));exit(EXIT_FAILURE);}//linux系统中,如果向已经断开连接的客户端发送信息,会触发SIGPIPE信号//导致程序退出,这里直接忽略该信号struct sigaction sa;sa.sa_handler = SIG_IGN;sigaction(SIGPIPE,&sa,0);char *ip = argv[1];short port = atoi(argv[2]);int sock_fd;// 1.初始化一个用于监听的socketsock_fd = init_server(ip, port);// 2.初始化reactorstruct Reactor *reactor = reactor_init();if( reactor == NULL){printf("Error in %s(), Reactor_init: create reactor error\n", __func__);exit(1);}// 3.将监听socket添加到reactor事件集中,并指定handler 为accept_callbackreactor_addlistener(reactor, sock_fd, accept_callback);// 4.运行reactorreactor_run(reactor);// 5.销毁reactor_destroy(reactor);//6.关闭监听socketclose(sock_fd);return 0;
}
四. 建议
自行实现的reactor模式,使用起来很繁琐,receive_callback 和send_callback还得操作IO,如果再加上业务逻辑,维护起来相当麻烦,需要十分熟悉底层socket编程才能编出正确的代码。所以实际应用时可以直接使用成熟的开源软件,比如libevent事件通知库,会大大降低开发难度,可以写出稳定易于维护的代码。上述示例仅供学习使用。
五. 补充一个windows系统下的IOCP并发网络模型
#include <iostream>
#define _WINSOCK_DEPRECATED_NO_WARNINGS#define BUFFER_SIZE 1024
#define START_POST_ACCEPTEX 2
#define THREAD_COUNT 2#include <Winsock2.h>
#pragma comment(lib, "Ws2_32.lib")#include <MSWSock.h>
#pragma comment(lib, "Mswsock.lib")struct ServerParams
{SOCKET listenerSock;HANDLE iocp;
};enum IO_OP_TYPE
{IO_ACCEPT,//AcceptEx 投递类型IO_RECV,//WSARecv 投递类型IO_SEND,//WSASend 投递类型IO_CONNECT,//ConnectEx 投递类型IO_DISCONNECT,//DisconnectEx 投递类型IO_EX_SEND=5 //外部投递写
};typedef struct OverlappedPerIO
{OVERLAPPED overlapped;SOCKET socket;WSABUF wsaBuf;IO_OP_TYPE type;char wbuff[BUFFER_SIZE];//发送buffchar rbuff[BUFFER_SIZE];//接收buff
}*LPOverlappedPerIO;typedef struct OverlappedPerIOW//用于延迟写
{OVERLAPPED overlapped;SOCKET socket;WSABUF wsaBuf;IO_OP_TYPE type;char wbuff[BUFFER_SIZE];//发送buffBOOL writeFinished;
}*LPOverlappedPerIOW;void MessageProcess(IN HANDLE handle,IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen);int InitServer(IN unsigned short port, OUT ServerParams& pms)
{WSADATA wsaData;int ret;ret = WSAStartup(MAKEWORD(2, 2), &wsaData);if (ret == 0){pms.listenerSock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);if (pms.listenerSock != INVALID_SOCKET){sockaddr_in address;address.sin_family = AF_INET;address.sin_addr.s_addr = INADDR_ANY;address.sin_port = htons(port);ret = _WINSOCK2API_::bind(pms.listenerSock, (SOCKADDR*)&address, sizeof(SOCKADDR));if (ret == 0){ret = listen(pms.listenerSock, SOMAXCONN);if (ret == 0){pms.iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);if (pms.iocp != NULL){if (NULL != CreateIoCompletionPort((HANDLE)pms.listenerSock, pms.iocp, NULL, 0)){return 0;}CloseHandle(pms.iocp);}}}closesocket(pms.listenerSock);}WSACleanup();}if (ret == 0)ret = -1;return ret;
}void PostAcceptEx(SOCKET listenerSock)
{SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);if (sock == INVALID_SOCKET)return;OverlappedPerIO* overlp = new OverlappedPerIO;if (overlp == nullptr){closesocket(sock);return;}memset(overlp, 0x00, sizeof(OverlappedPerIO));overlp->socket = sock;overlp->wsaBuf.buf = overlp->rbuff;overlp->wsaBuf.len = BUFFER_SIZE;overlp->type = IO_OP_TYPE::IO_ACCEPT;DWORD dwByteRecv = 0;while (AcceptEx(listenerSock, sock, overlp->wsaBuf.buf, 0, sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwByteRecv, (LPOVERLAPPED)overlp) == FALSE){if (WSAGetLastError() == WSA_IO_PENDING){break;}std::cout << "AcceptEx Error: " << WSAGetLastError() << std::endl;}}DWORD WINAPI workthread(LPVOID lpParam)
{ServerParams* pms = (ServerParams*)lpParam;HANDLE complectionPort = pms->iocp;SOCKET listenSock = pms->listenerSock;DWORD bytesTrans;ULONG_PTR completionKey=-1;LPOverlappedPerIO overlp;DWORD errCode;int ret;SOCKADDR_IN client_address;int addr_len = sizeof(SOCKADDR_IN);BOOL quit = FALSE;while (!quit){BOOL result = GetQueuedCompletionStatus(complectionPort, &bytesTrans, &completionKey, (LPOVERLAPPED*)&overlp, INFINITE);if (!result){errCode = GetLastError();if ((errCode == WAIT_TIMEOUT) || (errCode == ERROR_NETNAME_DELETED)||(errCode == ERROR_OPERATION_ABORTED)){if(overlp != NULL){std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;closesocket(overlp->socket);delete overlp;}continue;}std::cout << "GetQueuedCompletionStatus failed, Maybe IOCP handle has been closed." << std::endl;break;}if (overlp == NULL){std::cout << "GetQueuedCompletionStatus Quit Normally." << std::endl;break;}switch (overlp->type){case IO_OP_TYPE::IO_ACCEPT:{PostAcceptEx(listenSock);std::cout << "happened IO_ACCEPT: " << bytesTrans<<std::endl;setsockopt(overlp->socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&listenSock, sizeof(SOCKET));getpeername(overlp->socket, (struct sockaddr*)&client_address, &addr_len);std::cout << "client accepted, ip = " << inet_ntoa(client_address.sin_addr) << " port = " << ntohs(client_address.sin_port) << std::endl;ZeroMemory(overlp->rbuff, BUFFER_SIZE);ZeroMemory(overlp->wbuff, BUFFER_SIZE);overlp->type = IO_OP_TYPE::IO_RECV;overlp->wsaBuf.buf = overlp->rbuff;overlp->wsaBuf.len = BUFFER_SIZE;CreateIoCompletionPort((HANDLE)overlp->socket, complectionPort, NULL, 0);DWORD dwRecv = 0, dwFlag = 0;ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0);if (ret == SOCKET_ERROR){errCode = GetLastError();if (errCode != WSA_IO_PENDING)std::cout << "WSARecv failed: " << errCode << std::endl;}}break;case IO_OP_TYPE::IO_RECV:{std::cout << "happened IO_RECV, Data length = " << bytesTrans << std::endl;if (bytesTrans == 0){std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;closesocket(overlp->socket);delete overlp;continue;}DWORD dwSend = 0, dwRecv=0, dwFlag = 0;DWORD replyBufferSz = BUFFER_SIZE;ZeroMemory(overlp->wbuff, BUFFER_SIZE);MessageProcess(complectionPort,overlp->socket, overlp->rbuff, bytesTrans,overlp->wbuff,&replyBufferSz);ZeroMemory(overlp->rbuff, BUFFER_SIZE);if (replyBufferSz < 1){overlp->type = IO_OP_TYPE::IO_RECV;overlp->wsaBuf.buf = overlp->rbuff;overlp->wsaBuf.len = BUFFER_SIZE;ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0);}else {overlp->type = IO_OP_TYPE::IO_SEND;overlp->wsaBuf.len = replyBufferSz;overlp->wsaBuf.buf = overlp->wbuff;ret = WSASend(overlp->socket, &overlp->wsaBuf, 1, &dwSend, 0, &(overlp->overlapped), 0);}if (ret == SOCKET_ERROR){errCode = GetLastError();if (errCode != WSA_IO_PENDING)std::cout << "WSASend/WSARecv failed: " << errCode << std::endl;}}break;case IO_OP_TYPE::IO_SEND:{std::cout << "happened IO_SEND: " << bytesTrans << std::endl;if (bytesTrans == 0){std::cout << "socket disconnect, fd = " << overlp->socket << std::endl;closesocket(overlp->socket);delete overlp;continue;}ZeroMemory(overlp->wbuff, BUFFER_SIZE);ZeroMemory(overlp->rbuff, BUFFER_SIZE);overlp->type = IO_OP_TYPE::IO_RECV;overlp->wsaBuf.buf = overlp->rbuff;overlp->wsaBuf.len = BUFFER_SIZE;DWORD dwRecv = 0, dwFlag = 0;ret = WSARecv(overlp->socket, &overlp->wsaBuf, 1, &dwRecv, &dwFlag, &(overlp->overlapped), 0);if (ret == SOCKET_ERROR){errCode = GetLastError();if (errCode != WSA_IO_PENDING)std::cout << "WSARecv failed: " << errCode << std::endl;}}break;case IO_OP_TYPE::IO_EX_SEND:{LPOverlappedPerIOW overlpw = (LPOverlappedPerIOW)overlp;std::cout << "overlpw->type: "<<overlpw->type<<", overlpw->writeFinished: " << overlpw->writeFinished<< std::endl;if (overlpw->writeFinished) {delete overlpw;break;}overlpw->writeFinished = TRUE;DWORD dwSend = 0, dwFlag = 0;ret = WSASend(overlpw->socket, &overlpw->wsaBuf, 1, &dwSend, 0, &(overlpw->overlapped), 0);if (ret == SOCKET_ERROR){errCode = GetLastError();if (errCode != WSA_IO_PENDING) {std::cout << "WSASend failed: " << errCode << std::endl;}}}break;default:break;}}//_endthread();//<process.h>return 0;
}void MessageProcess(IN HANDLE handle,IN int fd, IN void* msg, IN DWORD msglen, OUT void* res, IN OUT DWORD* reslen)
{std::string str((char*)msg, msglen);std::cout << "msg content =" << str << std::endl;//*reslen = sprintf_s((char*)res, *reslen, "reply from server socket, with fd=%d.\n",fd);*reslen = 0;//根据业务情况,如果无法立即返回结果,则可以在这里置为0,然后在合适的时机,通过PostQueuedCompletionStatus投递写事件//*********************************** examples for write delay*******************************LPOverlappedPerIOW overlpw = new OverlappedPerIOW;memset(overlpw, 0x00, sizeof(OverlappedPerIOW));overlpw->socket = fd;overlpw->type = IO_OP_TYPE::IO_EX_SEND;overlpw->wsaBuf.len = sprintf_s(overlpw->wbuff, BUFFER_SIZE, "reply from server socket, with fd=%d.\n", fd);overlpw->wsaBuf.buf = overlpw->wbuff;PostQueuedCompletionStatus(handle, 0, 0, &(overlpw->overlapped));//********************************** end for examples***************************************
}int main()
{int ret;ServerParams pms{ 0 };ret = InitServer(50721, pms);if (ret != 0){std::cout << "Init server failed." << std::endl;return -1;}for (int i = 0; i < THREAD_COUNT; i++){CreateThread(NULL, 0, workthread, &pms, 0, NULL);}for (int i = 0; i < START_POST_ACCEPTEX; i++){PostAcceptEx(pms.listenerSock);}getchar();for (int i = 0; i < THREAD_COUNT; i++) {PostQueuedCompletionStatus(pms.iocp, 0, 0, NULL);}Sleep(1000);//wait for thread quitclosesocket(pms.listenerSock);CloseHandle(pms.iocp);WSACleanup();return 0;
}