Linux小知识---关于socket的一些知识点
前言
网络编程离不开socket,学一些不常用的知识点
设置socket
设置和读取socket的配置可以通过下面两个接口
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>int getsockopt(int sockfd, int level, int optname,void *optval, socklen_t *optlen);
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);
sock:将要被设置或者获取选项的套接字。
level:选项所在的协议层。 level指定控制套接字的层次.可以取三种值:
SOL_SOCKET:通用套接字选项.
IPPROTO_IP:IP选项.
IPPROTO_TCP:TCP选项.
optname:需要访问的选项名。(下文中会有一个表)
optval:值,对于getsockopt(),指向返回选项值的缓冲;对于setsockopt(),指向包含新选项值的缓冲。
optlen:值长度,对于getsockopt(),作为入口参数时,选项值的最大长度;作为出口参数时,选项值的实际长度。对于setsockopt(),现选项的长度。
返回值: 成功执行时,返回0。失败返回-1,errno被设为以下的某个值 :
EBADF:sock不是有效的文件描述词
EFAULT:optval指向的内存并非有效的进程空间
EINVAL:在调用 setsockopt()时,optlen无效
ENOPROTOOPT:指定的协议层不能识别选项 ENOTSOCK:sock描述的不是套接字
optname 参数详细说明:
SOL_SOCKET
选项名称 | 说明 | 数据类型 |
---|---|---|
SO_BROADCAST | 允许发送广播数据 | int |
SO_DEBUG | 允许调试 | int |
SO_DONTROUTE | 不查找路由 | int |
SO_ERROR | 获得套接字错误 | int |
SO_KEEPALIVE | 保持连接 | int |
SO_LINGER | 延迟关闭连接 | struct linger |
SO_OOBINLINE | 带外数据放入正常数据流 | int |
SO_RCVBUF | 接收缓冲区大小 | int |
SO_SNDBUF | 发送缓冲区大小 | int |
SO_RCVLOWAT | 接收缓冲区下限 | int |
SO_SNDLOWAT | 发送缓冲区下限 | int |
SO_RCVTIMEO | 接收超时 | struct timeval |
SO_SNDTIMEO | 发送超时 | struct timeval |
SO_REUSERADDR | 允许重用本地地址和端口 | int |
SO_TYPE | 获得套接字类型 | int |
SO_BSDCOMPAT | 与BSD系统兼容 | int |
IPPROTO_IP
选项名称 | 说明 | 数据类型 |
---|---|---|
IP_HDRINCL | 在数据包中包含IP首部 | int |
IP_OPTINOS | IP首部选项 | int |
IP_TOS | 服务类型 | int |
IP_TTL | 生存时间 | int |
IPPRO_TCP
选项名称 | 说明 | 数据类型 |
---|---|---|
TCP_MAXSEG | TCP最大数据段的大小 | int |
TCP_NODELAY | 不使用Nagle算法 | int |
这些都是摘抄的
还是来看几个用法吧。
1. 设置发送和接收的超时
接收和发送数据都可以设置超时时间
方法
int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0) return -1;send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){return -1;}return 0;
}
2. 设置缓冲区
接受和发送的缓冲区大小可以设置,在某些情况下可以减轻资源损耗
方法
int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size; //设置为32Kint nSendBuf=write_size; //设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){return -1;}return 0;}
3. 延迟关闭
这里用到了下面的结构体数据
/* Structure used to manipulate the SO_LINGER option. */
struct linger{int l_onoff; /* Nonzero to linger on close. */int l_linger; /* Time to linger. */};
根据linger结构体中两个成员变量的不同值,close系统调用可能产生如下3种行为之一:
(1)l_onoff等于0(关闭)。此时SO_LINGER选项不起作用,close用默认行为来关闭socket。如果send buffer中没有数据,close立即返回,如果send buffer中还有数据,close将会等到所有数据被发送完之后之后返回(相当于通信文件描述符是阻塞的)。由于我们并没有等待对方TCP发送的ACK信息,所以我们只能保证数据已经发送到对方,我们并不知道对方是否已经接受了数据。由于此种情况,TCP连接终止是按照正常的4次握手方式,需要经过TIME_WAIT。
(2)l_onoff不为0(开启),l_linger等于0。无论发送缓存区是否有数据,close系统调用立即返回,TCP模块将丢弃被关闭的socket对应的TCP发送缓冲区中残留的数据,同时给对方发送一个复位报文段(RST)。因此,这种情况给服务器提供了异常终止一个连接的方法。
(3)l_onoff不为0(开启),l_linger不等于0,实现优雅关闭的允许时间。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直到所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或延迟时间l_linger到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认之前l_linger时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,程序将不会等待close返回,send buffer中的所有数据都将会被丢弃,并将以WSAEWOULDBLOCK错误返回。
int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){return -1;}return 0;
}
4. 重启服务
以前写服务端,在bind函数调用时候经常会遇到下面的报错提示
Address already in use
我们可以看到。在不成功的时候,端口监听状态如下
[root@localhost ~]# netstat -tna | grep 80
tcp 0 0 192.168.32.94:80 192.168.31.2:1061 TIME_WAIT
一般来说,一个端口释放后会等待两分钟之后才能再被使用,TCP先调用close()的一方会进入TIME_WAIT状态。
SO_REUSEADDR是让端口释放后立即就可以被再次使用。
SO_REUSEADDR用于对TCP套接字处于TIME_WAIT状态下的socket,才可以重复绑定使用。server程序总是应该在调用bind()之前设置SO_REUSEADDR套接字选项。
方法
int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0) {return -1;}return 0;
}
5. 设置心跳
但是这个方法网上都不推荐,学习一下就行了
/* Set TCP keep alive option to detect dead peers. The interval option* is only used for Linux as we are using Linux-specific APIs to set* the probe send time, interval, and count. */
int anetKeepAlive(char *err, int fd, int interval)
{int val = 1;//开启keepalive机制if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));return ANET_ERR;}#ifdef __linux__/* Default settings are more or less garbage, with the keepalive time* set to 7200 by default on Linux. Modify settings to make the feature* actually useful. *//* Send first probe after interval. */val = interval;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));return ANET_ERR;}/* Send next probes after the specified interval. Note that we set the* delay as interval / 3, as we send three probes before detecting* an error (see the next setsockopt call). */val = interval/3;if (val == 0) val = 1;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));return ANET_ERR;}/* Consider the socket in error state after three we send three ACK* probes without getting a reply. */val = 3;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));return ANET_ERR;}
#endifreturn ANET_OK;
}
Socket多线程
socket编程的多线程,是在接收处理客户端消息的时候,采用子线程进行单独处理,并且线程分离,处理结束后,自动释放自己的资源。
服务端代码
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <ctype.h>#define BACKLOG 13
#define SERVER_PORT 80
static int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0){printf("[ERROR]socket参数错误");return -1;}send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size; //设置为32Kint nSendBuf=write_size; //设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0) {printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));return -1;}return 0;
}static int socket_init(char *listen_ip,int listen_port)
{int listenfd;struct sockaddr_in servaddr; if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0){printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));return -1;}printf("服务端创建TCPsocket[%d]成功\n",listenfd);if(set_sock_reuse(listenfd)<0) {return -2;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(listen_port);if(!listen_ip){servaddr.sin_addr.s_addr=htonl(INADDR_ANY);}else { servaddr.sin_addr.s_addr=inet_addr(listen_ip);} if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){ printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));return -2;}printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);listen(listenfd,BACKLOG);printf("开始监听端口[%d]\n",listen_port);return listenfd;
}static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{pthread_attr_t thread_attr;int rv = -1;if( pthread_attr_init(&thread_attr) )/*设置线程属性*/{printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));return -1;;}if( pthread_attr_setstacksize(&thread_attr, 120*1024) ){printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));return -2;}if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) ){printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));return -3;}if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/{printf("[ERROR]创建线程失败[%s]\n", strerror(errno));return -4;}return 0;
} static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");while(1){memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);pthread_exit(NULL);}else if( rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);pthread_exit(NULL);}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}/* convert letter from lowercase to uppercase */for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/{buf[i]=toupper(buf[i]);}rv=write(cli_fd, buf, rv);if(rv < 0){printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));close(cli_fd);pthread_exit(NULL);}}}int main(int argc,char **argv)
{int listen_fd;int clifd=-1;int rv=-2;int opt;struct sockaddr_in cliaddr;socklen_t cliaddr_len =sizeof(cliaddr) ;pthread_t tid;if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/{printf("socket初始化失败:%s",strerror(errno));return -2;}while(1){printf("开始等待新连接……\n");clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/if(clifd < 0){printf("连接新客户端失败: %s\n", strerror(errno));continue;}printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/}
}
这里客户端用了一个工具
[root@localhost test]# ./socket_pthread
服务端创建TCPsocket[3]成功
socket[3]绑定端口[80]成功
开始监听端口[80]
开始等待新连接……
新客户端接入[192.168.31.2:5784]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5785]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5786]成功
开始等待新连接……
子线程开始通信
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
客户端[6]连接结束 ,线程退出
客户端[5]连接结束 ,线程退出
客户端[4]连接结束 ,线程退出
线程池
都说线程池高级,那不妨用一下结合起来。
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <ctype.h>#define BACKLOG 13
#define SERVER_PORT 80// 任务结构体
typedef struct Task
{void (*function)(void* arg);void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ;int queueCapacity; // 容量int queueSize; // 当前任务个数int queueFront; // 队头 -> 取数据int queueRear; // 队尾 -> 放数据pthread_t managerID; // 管理者线程IDpthread_t *threadIDs; // 工作的线程IDint minNum; // 最小线程数量int maxNum; // 最大线程数量int busyNum; // 忙的线程的个数int liveNum; // 存活的线程的个数int exitNum; // 要销毁的线程个数pthread_mutex_t mutexPool; // 锁整个的线程池pthread_cond_t notFull; // 任务队列是不是满了pthread_cond_t notEmpty; // 任务队列是不是空了pthread_mutex_t mutexBusy; // 锁busyNum变量int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; ++i){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_mutex_lock(&pool->mutexPool);// 当前任务队列是否为空while (pool->queueSize == 0 && !pool->shutdown){// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 判断线程池是否被关闭了if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;// 解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);//free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown){// 每隔3s检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);const int NUMBER = 2;// 添加线程// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if (pool == NULL){printf("malloc threadpool fail...\n");break;}//申请max个phtread_t 线程idpool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL){printf("malloc threadIDs fail...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0; //在忙的线程pool->liveNum = min; //和最小个数相等pool->exitNum = 0; //需要销毁的//初始化锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or condition init fail...\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0; //当前任务数pool->queueFront = 0; //队头pool->queueRear = 0; //队尾pool->shutdown = 0; //线程池启动销毁// 创建管理线程pthread_create(&pool->managerID, NULL, manager, pool);// 创建工作线程,先创建min个线程for (int i = 0; i < min; ++i){pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);// 释放资源if (pool && pool->threadIDs) free(pool->threadIDs);if (pool && pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutdown){// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}static int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0){printf("[ERROR]socket参数错误");return -1;}send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));return -1;}return 0;}static int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size; //设置为32Kint nSendBuf=write_size; //设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));return -1;}return 0;}static int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0) {printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));return -1;}return 0;
}static int socket_init(char *listen_ip,int listen_port)
{int listenfd;struct sockaddr_in servaddr; if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0){printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));return -1;}printf("服务端创建TCPsocket[%d]成功\n",listenfd);if(set_sock_reuse(listenfd)<0) {return -2;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(listen_port);if(!listen_ip){servaddr.sin_addr.s_addr=htonl(INADDR_ANY);}else { servaddr.sin_addr.s_addr=inet_addr(listen_ip);} if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){ printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));return -2;}printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);listen(listenfd,BACKLOG);printf("开始监听端口[%d]\n",listen_port);return listenfd;
}static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{pthread_attr_t thread_attr;int rv = -1;if( pthread_attr_init(&thread_attr) )/*设置线程属性*/{printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));return -1;;}if( pthread_attr_setstacksize(&thread_attr, 120*1024) ){printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));return -2;}if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) ){printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));return -3;}if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/{printf("[ERROR]创建线程失败[%s]\n", strerror(errno));return -4;}return 0;
} static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");while(1){memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);//pthread_exit(NULL);break;}else if(rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);//pthread_exit(NULL);break;}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}/* convert letter from lowercase to uppercase */for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/{buf[i]=toupper(buf[i]);}rv=write(cli_fd, buf, rv);if(rv < 0){printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));close(cli_fd);//pthread_exit(NULL);break;}}}int main(int argc,char **argv)
{int listen_fd;int clifd=-1;int rv=-2;int opt;struct sockaddr_in cliaddr;socklen_t cliaddr_len =sizeof(cliaddr) ;pthread_t tid;// 创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/{printf("socket初始化失败:%s",strerror(errno));return -2;}while(1){printf("开始等待新连接……\n");clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/if(clifd < 0){printf("连接新客户端失败: %s\n", strerror(errno));continue;}printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));//thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/threadPoolAdd(pool, thread_worker, &clifd);}}
不过这种需要循环的客户端,立马就占满了3个处理线程,第四个加入的时候,就需要等待条件变量了。如果是那种短暂连接,发送个消息就结束的情况,应该就比较合适了
epoll+多线程
epoll可以高效处理socket连接,那么用来代替accept或者select自然是更好,配合线程池处理业务,来实现高并发快速处理
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>
#include<unistd.h>
#include<string.h>
#include <errno.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include <pthread.h>
#include <ctype.h>#define MAX 1024
#define listen_port 80// 任务结构体
typedef struct Task
{void (*function)(void* arg);void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ;int queueCapacity; // 容量int queueSize; // 当前任务个数int queueFront; // 队头 -> 取数据int queueRear; // 队尾 -> 放数据pthread_t managerID; // 管理者线程IDpthread_t *threadIDs; // 工作的线程IDint minNum; // 最小线程数量int maxNum; // 最大线程数量int busyNum; // 忙的线程的个数int liveNum; // 存活的线程的个数int exitNum; // 要销毁的线程个数pthread_mutex_t mutexPool; // 锁整个的线程池pthread_cond_t notFull; // 任务队列是不是满了pthread_cond_t notEmpty; // 任务队列是不是空了pthread_mutex_t mutexBusy; // 锁busyNum变量int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; ++i){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_mutex_lock(&pool->mutexPool);// 当前任务队列是否为空while (pool->queueSize == 0 && !pool->shutdown){// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 判断线程池是否被关闭了if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;// 解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);//free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown){// 每隔3s检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);const int NUMBER = 2;// 添加线程// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if (pool == NULL){printf("malloc threadpool fail...\n");break;}//申请max个phtread_t 线程idpool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL){printf("malloc threadIDs fail...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0; //在忙的线程pool->liveNum = min; //和最小个数相等pool->exitNum = 0; //需要销毁的//初始化锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or condition init fail...\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0; //当前任务数pool->queueFront = 0; //队头pool->queueRear = 0; //队尾pool->shutdown = 0; //线程池启动销毁// 创建管理线程pthread_create(&pool->managerID, NULL, manager, pool);// 创建工作线程,先创建min个线程for (int i = 0; i < min; ++i){pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);// 释放资源if (pool && pool->threadIDs) free(pool->threadIDs);if (pool && pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutdown){// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);}else if(rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}
}int main()
{int listenfd=socket(AF_INET,SOCK_STREAM,0); assert(listenfd!=-1);struct sockaddr_in ser,cli;ser.sin_family=AF_INET;ser.sin_port=htons(listen_port);ser.sin_addr.s_addr=htonl(INADDR_ANY);//inet_addr("127.0.0.1");int res=bind(listenfd,(struct sockaddr*)&ser,sizeof(ser));assert(res!=-1);listen(listenfd,5);int epfd=epoll_create(1); //创建内核事件表epfdassert(epfd!=-1);struct epoll_event ev; ev.events=EPOLLIN;ev.data.fd=listenfd; //初始化一个关于listenfd的event结构体res=epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //将关于listenfd的结构体放入内核事件表assert(res!=-1);struct epoll_event event[MAX]; //下面epoll_wait()要将就绪事件都放入该数组中返回回来// 创建线程池ThreadPool* pool = threadPoolCreate(10, 100, 100);while(1){int n=epoll_wait(epfd,event,MAX,-1); //核心函数;返回就绪文件描述符个数if(n==-1) {printf("error!\n");exit(0);}if(n==0){printf("timeout\n");continue;}int i=0;for(;i<n;++i){int fd=event[i].data.fd;if(event[i].events & EPOLLRDHUP) //cli输入“end”{printf("break\n");close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL); //将关于fd的结构体从epfd中删除continue;}if(event[i].events & EPOLLIN) {if(fd==listenfd){int len=sizeof(cli);int c=accept(listenfd,(struct sockaddr*)&cli,&len);assert(c!=-1);printf("link succese\n");ev.events= EPOLLIN|EPOLLET;//EPOLLIN|EPOLLRDHUP;ev.data.fd=c;res=epoll_ctl(epfd,EPOLL_CTL_ADD,c,&ev);assert(res!=-1);}else{threadPoolAdd(pool, thread_worker, &fd);}}}}
}
结束语
今天是国庆前最后一天
等了很久的放假,突然感觉也没那么有意思,回家也回不去,总是临近节假日,就开始有疫情。
虽然好多资本主义国家都宣布疫情结束了,他们一定是在骗自己,
再工作一会吧,站好最后一班岗