《网络编程基础之完成端口模型》
【完成端口模型导读】完成端口模型,算得上是真正的异步网络IO模型吧,相对于其它网络IO模型,操作系统通知我们的时候,要么就是连接已经帮我建立好,客户端套接字帮我们准备好;要么就是数据已经接收完成;要么就是本端的数据已经发送出去了。我们只需准备接收数据的容器以及客户端的套接字即可。
1、重难点分析
使用完成端口模型的几个主要步骤:
1、创建完成端口模型
2、创建监听socket,并将监听socket绑定到指定的IP地址和端口
3、将监听socket绑定到完成端口模型上去
以上步骤可以使用如下的示例代码进行概述:
HANDLE hCompletePort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);//监听socket,必须使用WSASocket接口创建,而且要加上WSA_FLAG_OVERLAPPED标记 HANDLE listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);// 将监听Socket绑定到完成端口struct per_socket_context listenContext;//per_socket_context,下文会介绍。if( NULL== CreateIoCompletionPort((HANDLE)listenSocket, hCompletePort,(ULONG_PTR)&listenContext,0)) { return;} //开启监听struct sockaddr_in serverAddress;// 填充地址信息ZeroMemory((char *)&serverAddress, sizeof(serverAddress));serverAddress.sin_family = AF_INET;// 这里可以绑定任何可用的IP地址,或者绑定一个指定的IP地址 serverAddress.sin_addr.s_addr = htonl(INADDR_ANY); serverAddress.sin_port = htons(8000);if (SOCKET_ERROR = bind(listenSocket, (struct sockaddr_in*)&serverAddress,sizeof(serverAddress))){return;}if (listen(listenSocket, SOMAXCONN) == SOCKET_ERROR){return;}
正如前文所说,完成端口模型帮我把连接建立,我们只需要准备好客户端套接字即可,可以这么理解吧!应用层只需要投递个接收连接的请求,等到操作系统通知我们的时候,连接已经建立好,客户端套接字已经生效,它表示一个合法的"客户端对象",此时可以通过这个客户端套接字和对端建立连接,进行数据通信。
那投递接收连接请求的接口是哪个? 就是AcceptEx。
1、接口声明
BOOL AcceptEx(_In_ SOCKET sListenSocket, //监听套接字_In_ SOCKET sAcceptSocket, //客户端套接字,使用完,下次投递请求的时候需再补充一个_In_ PVOID lpOutputBuffer, //接收在新连接上发送的第一个数据块_In_ DWORD dwReceiveDataLength, //实际接收数据的字节数_In_ DWORD dwLocalAddressLength, //本地地址信息保留的字节数_In_ DWORD dwRemoteAddressLength, //远程地址信息保留的字节数_Out_ LPDWORD lpdwBytesReceived, //指向接收接收字节计数的DWORD指针_In_ LPOVERLAPPED lpOverlapped //用于处理请求的 OVERLAPPED 结构。必须指定此参数;它不能为 NULL
);2、接口返回值AcceptEx函数成功完成,返回值为TRUE 。函数失败,AcceptEx返回FALSE。可调用WSAGetLastError返回扩展错误信息。如果 WSAGetLastError 返回 ERROR_IO_PENDING,则表示操作已成功启动,仍在进行中。 如果是其它错误,说明真的出错了。
虽然微软提供了接口的声明,根据msdn的官方声明,我们还是使用其它的方式去获取AcceptEx函数指针,具体的方式如下:
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(listenSocket, //任意一个有效的socket即可 SIO_GET_EXTENSION_FUNCTION_POINTER, &GuidAcceptEx, sizeof(GuidAcceptEx), &m_lpfnAcceptEx, sizeof(m_lpfnAcceptEx), &dwBytes, NULL, NULL))
{return;
}
m_lpfnAcceptEx就是我们拿到的AcceptEx函数指针,我们可以使用它来投递一个接收连接的请求。
好!介绍完投递接收连接请求的接口,我们再看看投递接收数据的接口又是怎样的?
int WSAAPI WSARecv(SOCKET s, // 客户套接字LPWSABUF lpBuffers,//指向 WSABUF 结构数组的指针DWORD dwBufferCount, //lpBuffers数组中的WSABUF结构数。LPDWORD lpNumberOfBytesRecvd,//接收的数据数(以字节为单位)的指针LPDWORD lpFlags,//默认为0LPWSAOVERLAPPED lpOverlapped, //overlapped结构指针LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
那如何投递接收数据的请求:
// 初始化变量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *Wbuf;
OVERLAPPED *pol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = RECV_POSTED;
int nBytesRecv = WSARecv(clientSock, Wbuf, 1, &dwBytes, &dwFlags, pol, NULL );if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))return;
投递发送数据的请求也是类似:
// 初始化变量
DWORD dwFlags = 0;
DWORD dwBytes = 0;
WSABUF *p_wbuf;
OVERLAPPED *pol = &pIoContext->m_Overlapped;
pIoContext->ResetBuffer();
pIoContext->m_OpType = SEND_POSTED;
// 初始化完成后,投递WSARecv请求
int nBytesSend = WSASend(ClientSock, p_wbuf, 1, &dwBytes, &dwFlags, p_ol, NULL);
// 如果返回值错误,并且错误的代码并非是Pending的话,那就说明这个重叠请求失败了
if ((SOCKET_ERROR == nBytesSend) && (WSA_IO_PENDING != WSAGetLastError()))
{return ;
}
投递成功后,如果对端有数据发送过来,那么完成端口的工作线程便会被唤醒,我们将从工作线程拿到对端发送过来的数据;亦或数据成功发送给对端,那么工作线程也会被唤醒,这个时候是继续投递发送请求还是投递一个接收数据的请求?就看自己的业务需要了。
正如上文所说,投递完请求,工作线程被唤醒,这个时候如何区分是"发送"成功还是"接收"成功?是监听套接字的事件还是客户端套接字的事件。介绍完如何区分这些差别之前,我们先看看工作线程是被什么API给挂起的?以至于有事件来时能被唤醒。
BOOL WINAPI GetQueuedCompletionStatus(_In_ HANDLE CompletionPort, //完成端口句柄_Out_ LPDWORD lpNumberOfBytes, //指向接收已完成 I/O 操作中传输的字节数的变量的指针。/*指向和完成端口相关附加数据,和CreateIoCompletionPort第三个参数是同一个指针,指向同一份内存。因此在将socket绑定到完成端口时,可以在lpCompletionKey中指定socket的类型那么工作线程就知道当前的事件是连接套接字产生的还是客户端套接字产生的*/_Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED *lpOverlapped,_In_ DWORD dwMilliseconds
);
像lpCompletionKey和socket相关的结构体指针,我们称之为单socket数据,具体的用法如下:
typedef struct _PER_SOCKET_CONTEXT
{//每一个客户端连接的SocketSOCKET m_Socket; //客户端的地址SOCKADDR_IN m_ClientAddr;//客户端网络操作的上下文数据,CArray<_PER_IO_CONTEXT*> m_arrayIoContext;
};//对于侦听socket
_PER_SOCKET_CONTEXT listenSocketContext;
listenSocketContext.s = ListenSocket;
CreateIoCompletionPort(ListenSocket, m_hIOCompletionPort, (ULONG_PTR)&listenSocketContext,0);//对于普通客户端连接socket
_PER_SOCKET_CONTEXT clientSocketContext;
clientSocketContext.s = acceptSocket;
CreateIoCompletionPort(acceptSocket, m_hIOCompletionPort,(ULONG_PTR)&clientSocketContext,0);
那么工作线程可以从对应的socketContext中解析出socket的值,具体是监听socket还是客户端socket,那可以做区分处理。
DWORD ThreadFunction()
{OVERLAPPED *pOverlapped = NULL;_PER_SOCKET_CONTEXT *pSocketContext = NULL;DWORD dwBytesTransfered = 0;BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort,&dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);if (((SOME_STRUCT*)pSocketContext)->s == 监听socket){//新连接接收成功,做一些操作}else //客户端套接字{if (事件类型 == 收到了一份数据){//解析数据}else if (事件类型 == 数据发送成功了){//继续投递发送数据请求}}
}
上述伪码中,假如是客户端套接字的事件,那么如何去区分事件类型,到底是"数据发送成功"还是"数据接收成功"?PER_SOCKET_DATA中是没有表明事件类型的。具体是发送数据还是接收数据,我们在投递发送请求或者接收请求的时候,是可以标注的,我们再回过头看看WSARecv接口的声明。
int WSAAPI WSARecv(SOCKET s, // 客户套接字......LPWSAOVERLAPPED lpOverlapped, //overlapped结构指针LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);
OVERLAPPED结构体指针lpOverlapped,基于指针的伸缩性,我们可以通过偏移访问指定范围的内存数据,假如我们在OVERLAPPED结构后面附加一些额外的数据用于标识当前的IO行为是"接收"还是"发送",那么工作线程被唤醒时,就可以通过lpOverlapped指针取出附加在OVERLAPPED后面的数据了,那么具体的事件类型就能很轻松获取到了。针对这种标识IO行为的结构体,我们称为PER_IO_DATA。
typedef struct _PER_IO_CONTEXT
{// 每一个重叠网络操作的重叠结构(针对每一个Socket的每一个操作,都要有一个) OVERLAPPED m_Overlapped;// 这个网络操作所使用的SocketSOCKET m_sockAccept; // WSA类型的缓冲区,用于给重叠操作传参数的WSABUF m_wsaBuf; // 这个是WSABUF里具体存字符的缓冲区char m_szBuffer[MAX_BUFFER_LEN]; // 标识网络操作的类型(对应上面的枚举), 发送或者接收事件OPERATION_TYPE m_OpType;
};
那上文提到的工作线程就可以这样做改造了:
DWORD ThreadFunction()
{OVERLAPPED *pOverlapped = NULL;_PER_SOCKET_CONTEXT *pSocketContext = NULL;DWORD dwBytesTransfered = 0;BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort,&dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);if (((SOME_STRUCT*)pSocketContext)->s == 监听socket){//新连接接收成功,做一些操作}else //客户端套接字{//推荐使用微软封装的接口进行转换PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(pOverlapped, PER_IO_CONTEXT, m_Overlapped);if (pIOContext->Type == 接收){//解析数据}else if (pIOContext->Type == 发送){}}
}
有事件产生时,工作线程会被唤醒,假如我们想主动退出工作线程,那这个时候该如何唤醒挂在GetQueuedCompletionStatus接口的线程,既然有Get接口,那肯定就有Post接口;微软提供如下接口用于唤醒挂在GetQueuedCompletionStatus接口的线程。
BOOL WINAPI PostQueuedCompletionStatus(_In_ HANDLE CompletionPort,_In_ DWORD dwNumberOfBytesTransferred,_In_ ULONG_PTR dwCompletionKey,_In_opt_ LPOVERLAPPED lpOverlapped
);
假设我们将第三个参数dwCompletionKey指向一个特殊的标识符(EXIT_CODE),那么工作线程被唤醒时,是不是可以从dwCompletionKey解析出这个特殊的标识符(EXIT_CODE),随后退出工作线程。
PostQueuedCompletionStatus(m_hIOCompletionPort, 0, (DWORD)EXIT_CODE, NULL);DWORD ThreadFunction()
{OVERLAPPED *pOverlapped = NULL;PER_SOCKET_CONTEXT *pSocketContext = NULL;DWORD dwBytesTransfered = 0;BOOL bReturn = GetQueuedCompletionStatus(m_hIOCompletionPort, &dwBytesTransfered, (PULONG_PTR)&pSocketContext, &pOverlapped, INFINITE);//收到退出标志,退出线程if ( EXIT_CODE==(DWORD)pSocketContext ){return 0;}......
}
2、完整示例代码
IOCPModel.cpp
#include "CIOCPModel.h"#define DEFAULT_NUM_OF_PROCESSOR 2#define RELEASE_HANDLE(x) if(x != NULL && x != INVALID_HANDLE_VALUE ) if(x != NULL){ CloseHandle(x); x = NULL;}
#define RELEASE_SOCKET(x) if(x!= NULL && x!= INVALID_SOCKET){closesocket(x); x = NULL;}using namespace std;DWORD __stdcall CIOCPModel::_WorkThread(LPVOID lpParam)
{THREADPARAMS_WORKER *pThreadParam = (PTHREADPARAMS_WORKER)lpParam;CIOCPModel *pIoModel = pThreadParam->pIOCPModel;size_t ThreadNo = pThreadParam->dwThreadNo;//线程等到系统的IO通知,需要准备三个数据结构OVERLAPPED *pOverLapped = NULL;PER_SOCKET_CONTEXT *perSocket = NULL;DWORD dwByteTransfered = 0;while (WAIT_OBJECT_0 != WaitForSingleObject(pIoModel->m_hShutDownEvent, 0)){bool bFlag = GetQueuedCompletionStatus(pIoModel->m_hIOCompletionPort,&dwByteTransfered, (PULONG_PTR)&perSocket, &pOverLapped, INFINITE);OVERLAPPED m_Overlapped;//先处理主动退出的逻辑4if(EXIT_CODE == (DWORD)perSocket){break;}if (!bFlag){continue;}else{PER_IO_CONTEXT *perIoContext = CONTAINING_RECORD(pOverLapped, PER_IO_CONTEXT, m_Overlapped);if ((dwByteTransfered == 0) &&(perIoContext->m_OperatorType == RECV_POSTED|| perIoContext->m_OperatorType == SEND_POSTED)){cout << "Client is closed." << endl;pIoModel->_RemoveContextList(perSocket);continue;}switch (perIoContext->m_OperatorType){case RECV_POSTED://pIoModel->_PostRecv(perIoContext);pIoModel->_DoRecv(perSocket, perIoContext);break;case SEND_POSTED:pIoModel->_PostSend(perIoContext);break;case ACCEPT_POSTED:pIoModel->_DoAccept(perSocket, perIoContext);break;default:break;}}}if (pThreadParam != NULL){delete pThreadParam;}return 0;
}CIOCPModel::CIOCPModel():m_nThreads(0),m_hShutDownEvent(NULL),m_hIOCompletionPort(NULL),m_lpfnAcceptEx(NULL),m_lpfnGetAcceptExSockAddrs(NULL),m_WorkThread(NULL),m_strIP(DEFAULT_IP),m_port(DEFAULT_PORT),pListenSocketContext(NULL){}CIOCPModel::~CIOCPModel()
{this->stop();
}bool CIOCPModel::start()
{InitializeCriticalSection(&m_cs);m_hShutDownEvent = CreateEvent(NULL, TRUE, FALSE, NULL);if (this->_InitIOCPModel()){std::cout << "initialize IOCP Model success." << std::endl;}else{std::cout << "initialize IOCP Model failed." << std::endl;return false;}//初始化监听socketif (this->_InitializeListenSocket()){std::cout << "initialize Listen socket success" << std::endl;}else{std::cout << "initialize Listen socket error" << std::endl;return false;}return true;
}
void CIOCPModel::stop()
{//主动退出,OVERLAPPED *pOverlapped = NULL;PER_SOCKET_CONTEXT *perSocketContext = NULL;DWORD dwByteTransfered = 0;if (pListenSocketContext != NULL && pListenSocketContext->m_Socket != INVALID_SOCKET){//先将m_hShutDownEvent设置为有信号的状态SetEvent(m_hShutDownEvent);for (int i = 0; i < m_nThreads; i++){PostQueuedCompletionStatus(m_hIOCompletionPort, dwByteTransfered, (DWORD)EXIT_CODE,pOverlapped);}WaitForMultipleObjects(m_nThreads, m_WorkThread, true, INFINITE);}
}
bool CIOCPModel::LoadSocketLib()
{return true;
}
void CIOCPModel::unLoadSocketLib()
{}
std::string CIOCPModel::GetLocalIP()
{char hostName[256] = { 0 };gethostname(hostName, 256);struct hostent FAR* lpHostEnt = gethostbyname(hostName);if (lpHostEnt == NULL)return "127.0.0.1";LPSTR lpAddr = lpHostEnt->h_addr_list[0];struct in_addr inAddr;memmove(&inAddr, lpAddr, 4);m_strIP = inet_ntoa(inAddr);return m_strIP;
}
void CIOCPModel::setPort(const int &m_port)
{}bool CIOCPModel::_InitIOCPModel()
{//创建完成端口模型m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);if (m_hIOCompletionPort == NULL){std::cout << "创建完成端口模型失败" << std::endl;return false;}//初始化一个线程池,经验值:cpu的数量乘以2m_nThreads = 2 * _GetNoOfProcessor();m_WorkThread = new HANDLE[m_nThreads];DWORD nThreadId;for (int i = 0; i < m_nThreads; i++){THREADPARAMS_WORKER *pWorkThreadParam = new THREADPARAMS_WORKER;pWorkThreadParam->pIOCPModel = this;pWorkThreadParam->dwThreadNo = i + 1;m_WorkThread[i] = ::CreateThread(0, 0, _WorkThread, (LPVOID)pWorkThreadParam, 0, &nThreadId);}std::cout << "创建了 " << m_nThreads << " 个线程" << std::endl;return true;
}bool CIOCPModel::_InitializeListenSocket()
{GUID GuidAcceptEx = WSAID_ACCEPTEX;GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;struct sockaddr_in serverAddr;pListenSocketContext = new PER_SOCKET_CONTEXT;pListenSocketContext->m_Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL,0, WSA_FLAG_OVERLAPPED);if (pListenSocketContext->m_Socket == INVALID_SOCKET){std::cout << "create listen socket error" << std::endl;return false;}//将监听套接字绑定到完成端口上if (NULL == CreateIoCompletionPort((HANDLE)pListenSocketContext->m_Socket,m_hIOCompletionPort, (DWORD)pListenSocketContext, 0)){std::cout << "Associate completion port to listen socket error" << std::endl;return false;}//绑定地址ZeroMemory(&serverAddr, sizeof(serverAddr));serverAddr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);serverAddr.sin_port = htons(DEFAULT_PORT);serverAddr.sin_family = AF_INET;if (bind(pListenSocketContext->m_Socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1){std::cout << "bind error" << std::endl;return false;}if (-1 == listen(pListenSocketContext->m_Socket, SOMAXCONN)){std::cout << "listen error" << std::endl;return false;}DWORD dwByets = 0;if (WSAIoctl(pListenSocketContext->m_Socket,SIO_GET_EXTENSION_FUNCTION_POINTER,&GuidAcceptEx,sizeof(GuidAcceptEx),&m_lpfnAcceptEx,sizeof(m_lpfnAcceptEx),&dwByets,0,0) == -1){std::cout << "get acceptex error" << std::endl;return false;}if (WSAIoctl(pListenSocketContext->m_Socket,SIO_GET_EXTENSION_FUNCTION_POINTER,&GuidGetAcceptExSockAddrs,sizeof(GuidGetAcceptExSockAddrs),&m_lpfnGetAcceptExSockAddrs,sizeof(m_lpfnGetAcceptExSockAddrs),&dwByets,0,0) == -1){std::cout << "get GuidGetAcceptExSockAddrs error" << std::endl;return false;}//事先准备五个客户端套接字for (int i = 0; i < 10; i++){PER_IO_CONTEXT *PerIoContext = pListenSocketContext->GetNewIOContext();if (this->_PostAccept(PerIoContext) == false){std::cout << "post accept error" << std::endl;return false;}}return true;
}void CIOCPModel::_DeInitializeIOCPModel()
{DeleteCriticalSection(&m_cs);RELEASE_HANDLE(m_hShutDownEvent);for (int i = 0; i < m_nThreads; i++){RELEASE_HANDLE(this->m_WorkThread[i]);}delete []this->m_WorkThread;RELEASE_HANDLE(m_hIOCompletionPort);delete pListenSocketContext;return;
}bool CIOCPModel::_PostAccept(PER_IO_CONTEXT *pAcceptIoContext)
{//投递Accept请求,需要事先准备好客户端套接字assert(pListenSocketContext->m_Socket != INVALID_SOCKET);DWORD dwBytes = 0;pAcceptIoContext->m_OperatorType = ACCEPT_POSTED;WSABUF *wsaBuf = &pAcceptIoContext->m_wsaBuf;OVERLAPPED *p_ol = &pAcceptIoContext->m_Overlapped;pAcceptIoContext->m_AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL,0, WSA_FLAG_OVERLAPPED);if (pAcceptIoContext->m_AcceptSocket == INVALID_SOCKET){std::cout << "create accept socket error" << std::endl;return false;}if (false == m_lpfnAcceptEx(pListenSocketContext->m_Socket, pAcceptIoContext->m_AcceptSocket,wsaBuf->buf, wsaBuf->len - (sizeof(SOCKADDR_IN) + 16) *2 ,sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,&dwBytes, p_ol)){DWORD dwError = WSAGetLastError();if (dwError != WSA_IO_PENDING){std::cout << "Post Accept error" << std::endl;return false;}}return true;
}
bool CIOCPModel::_PostRecv(PER_IO_CONTEXT *PerContext)
{//投递Recv请求DWORD dwBytes = 0;DWORD dwFlag = 0;WSABUF *m_wsaBuf = &PerContext->m_wsaBuf;OVERLAPPED *o_lp = &PerContext->m_Overlapped;PerContext->ResetBuffer();PerContext->m_OperatorType = RECV_POSTED;int dwBytesReturn = WSARecv(PerContext->m_AcceptSocket, m_wsaBuf, 1, &dwBytes, &dwFlag, o_lp, NULL);if (dwBytesReturn == -1){if (WSAGetLastError() != WSA_IO_PENDING){std::cout << "post recv error " << WSAGetLastError() << std::endl;return false;}}//cout << "ThreadId: "<< GetCurrentThreadId() << " recv from client: " << m_wsaBuf->buf << " Len: " << m_wsaBuf->len << endl;return true;
}bool CIOCPModel::_PostSend(PER_IO_CONTEXT *PerContext)
{DWORD dwBytes = 0;DWORD dwFlag = 0;WSABUF *m_wsaBuf = &PerContext->m_wsaBuf;OVERLAPPED *o_l = &PerContext->m_Overlapped;PerContext->m_OperatorType = SEND_POSTED;int dwBytesReturn = WSASend(PerContext->m_AcceptSocket, m_wsaBuf, 1, &dwBytes, dwFlag, o_l, NULL);if (dwBytesReturn == -1){if (WSAGetLastError() == WSA_IO_PENDING){cout << "Post send error" << endl;return false;}}return true;
}bool CIOCPModel::_DoAccept(PER_SOCKET_CONTEXT *PerSocket, PER_IO_CONTEXT *PerContext)
{SOCKADDR_IN *ClientAddr = NULL;SOCKADDR_IN *ServerAddr = NULL;int dwRemoteAddrLen = sizeof(SOCKADDR_IN);int dwLocalAddrLen = sizeof(SOCKADDR_IN);//把客户端和服务端的IP地址拿到,顺带把服务端发过来的第一组数据给拿到this->m_lpfnGetAcceptExSockAddrs(PerContext->m_wsaBuf.buf,PerContext->m_wsaBuf.len - (sizeof(SOCKADDR_IN) + 16) * 2,sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16,(LPSOCKADDR *)&ServerAddr, &dwLocalAddrLen, (LPSOCKADDR *)&ClientAddr, &dwRemoteAddrLen);std::cout << "客户端地址:" << inet_ntoa(ClientAddr->sin_addr) << " 端口: " << ClientAddr->sin_port << std::endl;std::cout << "服务端地址: " << inet_ntoa(ServerAddr->sin_addr) << " 端口: " << ServerAddr->sin_port << std::endl;std::cout << "客户端的发来的数据: " << PerContext->m_wsaBuf.buf << endl;PER_SOCKET_CONTEXT *PerSocketContext = new PER_SOCKET_CONTEXT;PerSocketContext->m_Socket = PerContext->m_AcceptSocket;memcpy(&PerSocketContext->m_ClientAddr, ClientAddr, sizeof(SOCKADDR_IN));//将Accept Socket和完成端口绑定在一起if (false == this->_AssociateWithIOCP(PerSocketContext)){std::cout << "Associate accept scket error" << std::endl;return false;}PER_IO_CONTEXT *PerIoContext = PerSocketContext->GetNewIOContext();PerIoContext->ResetBuffer();PerIoContext->m_OperatorType = RECV_POSTED;PerIoContext->m_AcceptSocket = PerSocketContext->m_Socket;//开始投递一个Recv请求if (false == this->_PostRecv(PerIoContext)){if (WSAGetLastError() != WSA_IO_PENDING){cout << "Do Accept Post Recv failed." << WSAGetLastError() << endl;PerSocket->RemoveIOContext(PerIoContext);return false;}}//投递成功,将该请求添加到PerSocketContext队列this->_AddToContextList(PerSocket);//清空缓冲区PerIoContext->ResetBuffer();return this->_PostRecv(PerIoContext);
}bool CIOCPModel::_DoRecv(PER_SOCKET_CONTEXT *PerSocketContext, PER_IO_CONTEXT *PerIoContext)
{SOCKADDR_IN *ClientAddr = &PerSocketContext->m_ClientAddr;cout << "Client Addr: " << inet_ntoa(ClientAddr->sin_addr) << " Port: "<< ClientAddr->sin_port << " "<< PerIoContext->m_wsaBuf.buf << " len: "<< PerIoContext->m_wsaBuf.len << endl;//开始投递下一个请求return this->_PostRecv(PerIoContext);
}void CIOCPModel::_AddToContextList(PER_SOCKET_CONTEXT *pSocketContext)
{EnterCriticalSection(&m_cs);m_ClientContextDeque.push_back(pSocketContext);LeaveCriticalSection(&m_cs);return;
}void CIOCPModel::_RemoveContextList(PER_SOCKET_CONTEXT *pSocketContext)
{EnterCriticalSection(&m_cs);PER_SOCKET_CONTEXT *pTempSocket = m_ClientContextDeque.front();m_ClientContextDeque.pop_front();delete pTempSocket;LeaveCriticalSection(&m_cs);
}void CIOCPModel::_ClearContextList()
{EnterCriticalSection(&m_cs);m_ClientContextDeque.clear();LeaveCriticalSection(&m_cs);
}bool CIOCPModel::_AssociateWithIOCP(PER_SOCKET_CONTEXT *pSocketContext)
{//将对应的句柄绑定到完成端口中区HANDLE hCompletePort = CreateIoCompletionPort((HANDLE)pSocketContext->m_Socket,m_hIOCompletionPort, (DWORD)pSocketContext, 0);if (hCompletePort == NULL){cout << "Associate Io Complettion Port failed. " << endl;return false;}return true;
}bool CIOCPModel::HandleError(PER_SOCKET_CONTEXT *pSocketContext, const DWORD &dwError)
{return true;
}int CIOCPModel::_GetNoOfProcessor()
{SYSTEM_INFO si;GetSystemInfo(&si);return si.dwNumberOfProcessors;
}bool CIOCPModel::_IsSocketAlive(SOCKET s)
{int nBytesSend = send(s, "a", 1, 0);if (nBytesSend == -1)return false;return true;
}
IOCPModel.h
#pragma once
#ifndef _C_IOCP_MODEL_H_
#define _C_IOCP_MODEL_H_
#include <iostream>
#include <WinSock2.h>
#include <algorithm>
#include <deque>
#include <vector>
#include <assert.h>
#include <MSWSock.h>
#pragma comment(lib,"ws2_32.lib")#define MAX_BUFFER_LEN 8192
#define DEFAULT_PORT 12345
#define DEFAULT_IP "127.0.0.1"#define EXIT_CODE 0x111typedef enum _OPERATION_TYPE
{ACCEPT_POSTED,SEND_POSTED,RECV_POSTED,NULL_POSTED
}OPERATION_TYPE;typedef struct _PER_IO_CONTEXT
{OVERLAPPED m_Overlapped;SOCKET m_AcceptSocket;WSABUF m_wsaBuf;char m_szBuf[MAX_BUFFER_LEN];OPERATION_TYPE m_OperatorType;_PER_IO_CONTEXT(){ZeroMemory(&m_Overlapped, sizeof(m_Overlapped));ZeroMemory(m_szBuf, MAX_BUFFER_LEN);m_AcceptSocket = INVALID_SOCKET;m_wsaBuf.buf = m_szBuf;m_wsaBuf.len = MAX_BUFFER_LEN;m_OperatorType = NULL_POSTED;}~_PER_IO_CONTEXT(){if (m_AcceptSocket != INVALID_SOCKET){closesocket(m_AcceptSocket);m_AcceptSocket = INVALID_SOCKET;}}void ResetBuffer(){ZeroMemory(m_szBuf, MAX_BUFFER_LEN);}}PER_IO_CONTEXT,*PPER_IO_CONTEXT;typedef struct _PER_SOCKET_CONTEXT
{SOCKET m_Socket;SOCKADDR_IN m_ClientAddr;std::deque<PER_IO_CONTEXT *> m_PerIoContextArr;_PER_SOCKET_CONTEXT(){m_Socket = INVALID_SOCKET;memset(&m_ClientAddr, 0, sizeof(m_ClientAddr));}~_PER_SOCKET_CONTEXT(){if (m_Socket != INVALID_SOCKET){closesocket(m_Socket);m_Socket = INVALID_SOCKET;}for (int i = 0; i < m_PerIoContextArr.size();i++){delete m_PerIoContextArr.at(i);}m_PerIoContextArr.clear();}_PER_IO_CONTEXT *GetNewIOContext(){_PER_IO_CONTEXT *p = new _PER_IO_CONTEXT;m_PerIoContextArr.push_back(p);return p;}void RemoveIOContext(_PER_IO_CONTEXT *pIoContext){assert(pIoContext != NULL);std::deque<PER_IO_CONTEXT *>::iterator iter = m_PerIoContextArr.begin();for (; iter != m_PerIoContextArr.end(); ++iter){if (*iter == pIoContext){iter = m_PerIoContextArr.erase(iter);break;}}}}PER_SOCKET_CONTEXT,*PPER_SOCKET_CONTEXT;class CIOCPModel;typedef struct _tagThreadParameter_WORKER
{CIOCPModel *pIOCPModel;int dwThreadNo;
}THREADPARAMS_WORKER, *PTHREADPARAMS_WORKER;class CIOCPModel
{
public:CIOCPModel();~CIOCPModel();public:bool start();void stop();bool LoadSocketLib();void unLoadSocketLib();std::string GetLocalIP();void setPort(const int &m_port);protected:bool _InitIOCPModel();bool _InitializeListenSocket();void _DeInitializeIOCPModel();bool _PostAccept(PER_IO_CONTEXT *PerContext);bool _PostRecv(PER_IO_CONTEXT *PerContext);bool _PostSend(PER_IO_CONTEXT *PerContext);bool _DoAccept(PER_SOCKET_CONTEXT *PerSocket, PER_IO_CONTEXT *PerContext);bool _DoRecv(PER_SOCKET_CONTEXT *PerSocket, PER_IO_CONTEXT *PerContext);void _AddToContextList(PER_SOCKET_CONTEXT *pSocketContext);void _RemoveContextList(PER_SOCKET_CONTEXT *pSocketContext);void _ClearContextList();bool _AssociateWithIOCP(PER_SOCKET_CONTEXT *pSocketContext);bool HandleError(PER_SOCKET_CONTEXT *pSocketContext, const DWORD &dwError);static DWORD __stdcall _WorkThread(LPVOID lpParam);int _GetNoOfProcessor();bool _IsSocketAlive(SOCKET s);
private:HANDLE m_hShutDownEvent;HANDLE m_hIOCompletionPort;HANDLE *m_WorkThread;int m_nThreads;std::string m_strIP;int m_port;CRITICAL_SECTION m_cs;std::deque<PER_SOCKET_CONTEXT *> m_ClientContextDeque;PER_SOCKET_CONTEXT *pListenSocketContext;LPFN_ACCEPTEX m_lpfnAcceptEx;LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;};#endif