当前位置: 首页 > article >正文

Linux小知识---关于socket的一些知识点

前言

网络编程离不开socket,学一些不常用的知识点
在这里插入图片描述

设置socket

设置和读取socket的配置可以通过下面两个接口

#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>int getsockopt(int sockfd, int level, int optname,void *optval, socklen_t *optlen);
int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);

sock:将要被设置或者获取选项的套接字。
level:选项所在的协议层。 level指定控制套接字的层次.可以取三种值:

SOL_SOCKET:通用套接字选项.
IPPROTO_IP:IP选项.
IPPROTO_TCP:TCP选项.

optname:需要访问的选项名。(下文中会有一个表)
optval:值,对于getsockopt(),指向返回选项值的缓冲;对于setsockopt(),指向包含新选项值的缓冲。
optlen:值长度,对于getsockopt(),作为入口参数时,选项值的最大长度;作为出口参数时,选项值的实际长度。对于setsockopt(),现选项的长度。

返回值: 成功执行时,返回0。失败返回-1,errno被设为以下的某个值 :

EBADF:sock不是有效的文件描述词
EFAULT:optval指向的内存并非有效的进程空间
EINVAL:在调用 setsockopt()时,optlen无效
ENOPROTOOPT:指定的协议层不能识别选项 ENOTSOCK:sock描述的不是套接字

在这里插入图片描述

optname 参数详细说明:
SOL_SOCKET

选项名称说明数据类型
SO_BROADCAST允许发送广播数据int
SO_DEBUG允许调试int
SO_DONTROUTE不查找路由int
SO_ERROR获得套接字错误int
SO_KEEPALIVE保持连接int
SO_LINGER延迟关闭连接struct linger
SO_OOBINLINE带外数据放入正常数据流int
SO_RCVBUF接收缓冲区大小int
SO_SNDBUF发送缓冲区大小int
SO_RCVLOWAT接收缓冲区下限int
SO_SNDLOWAT发送缓冲区下限int
SO_RCVTIMEO接收超时struct timeval
SO_SNDTIMEO发送超时struct timeval
SO_REUSERADDR允许重用本地地址和端口int
SO_TYPE获得套接字类型int
SO_BSDCOMPAT与BSD系统兼容int

IPPROTO_IP

选项名称说明数据类型
IP_HDRINCL在数据包中包含IP首部int
IP_OPTINOSIP首部选项int
IP_TOS服务类型int
IP_TTL生存时间int

IPPRO_TCP

选项名称说明数据类型
TCP_MAXSEGTCP最大数据段的大小int
TCP_NODELAY不使用Nagle算法int

这些都是摘抄的
在这里插入图片描述

还是来看几个用法吧。

1. 设置发送和接收的超时

接收和发送数据都可以设置超时时间
方法

int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0) return -1;send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){return -1;}return 0;
}

在这里插入图片描述

2. 设置缓冲区

接受和发送的缓冲区大小可以设置,在某些情况下可以减轻资源损耗
方法

int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size;	//设置为32Kint nSendBuf=write_size;	//设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){return -1;}return 0;}

在这里插入图片描述

3. 延迟关闭

这里用到了下面的结构体数据

/* Structure used to manipulate the SO_LINGER option.  */
struct linger{int l_onoff;                /* Nonzero to linger on close.  */int l_linger;               /* Time to linger.  */};

根据linger结构体中两个成员变量的不同值,close系统调用可能产生如下3种行为之一:
(1)l_onoff等于0(关闭)。此时SO_LINGER选项不起作用,close用默认行为来关闭socket。如果send buffer中没有数据,close立即返回,如果send buffer中还有数据,close将会等到所有数据被发送完之后之后返回(相当于通信文件描述符是阻塞的)。由于我们并没有等待对方TCP发送的ACK信息,所以我们只能保证数据已经发送到对方,我们并不知道对方是否已经接受了数据。由于此种情况,TCP连接终止是按照正常的4次握手方式,需要经过TIME_WAIT。
(2)l_onoff不为0(开启),l_linger等于0。无论发送缓存区是否有数据,close系统调用立即返回,TCP模块将丢弃被关闭的socket对应的TCP发送缓冲区中残留的数据,同时给对方发送一个复位报文段(RST)。因此,这种情况给服务器提供了异常终止一个连接的方法。
(3)l_onoff不为0(开启),l_linger不等于0,实现优雅关闭的允许时间。如果套接口缓冲区中仍残留数据,进程将处于睡眠状态,直到所有数据发送完且被对方确认,之后进行正常的终止序列(描述字访问计数为0)或延迟时间l_linger到。此种情况下,应用程序检查close的返回值是非常重要的,如果在数据发送完并被确认之前l_linger时间到,close将返回EWOULDBLOCK错误且套接口发送缓冲区中的任何数据都丢失。close的成功返回仅告诉我们发送的数据(和FIN)已由对方TCP确认,它并不能告诉我们对方应用进程是否已读了数据。如果套接口设为非阻塞的,程序将不会等待close返回,send buffer中的所有数据都将会被丢弃,并将以WSAEWOULDBLOCK错误返回。

int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){return -1;}return 0;
}

在这里插入图片描述

4. 重启服务

以前写服务端,在bind函数调用时候经常会遇到下面的报错提示

Address already in use

我们可以看到。在不成功的时候,端口监听状态如下

[root@localhost ~]#  netstat -tna | grep 80
tcp        0      0 192.168.32.94:80        192.168.31.2:1061       TIME_WAIT 

一般来说,一个端口释放后会等待两分钟之后才能再被使用,TCP先调用close()的一方会进入TIME_WAIT状态。
SO_REUSEADDR是让端口释放后立即就可以被再次使用。
SO_REUSEADDR用于对TCP套接字处于TIME_WAIT状态下的socket,才可以重复绑定使用。server程序总是应该在调用bind()之前设置SO_REUSEADDR套接字选项。

方法

int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)  {return -1;}return 0;
}

在这里插入图片描述

5. 设置心跳

但是这个方法网上都不推荐,学习一下就行了

/* Set TCP keep alive option to detect dead peers. The interval option* is only used for Linux as we are using Linux-specific APIs to set* the probe send time, interval, and count. */
int anetKeepAlive(char *err, int fd, int interval)
{int val = 1;//开启keepalive机制if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));return ANET_ERR;}#ifdef __linux__/* Default settings are more or less garbage, with the keepalive time* set to 7200 by default on Linux. Modify settings to make the feature* actually useful. *//* Send first probe after interval. */val = interval;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));return ANET_ERR;}/* Send next probes after the specified interval. Note that we set the* delay as interval / 3, as we send three probes before detecting* an error (see the next setsockopt call). */val = interval/3;if (val == 0) val = 1;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));return ANET_ERR;}/* Consider the socket in error state after three we send three ACK* probes without getting a reply. */val = 3;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));return ANET_ERR;}
#endifreturn ANET_OK;
}

在这里插入图片描述

Socket多线程

socket编程的多线程,是在接收处理客户端消息的时候,采用子线程进行单独处理,并且线程分离,处理结束后,自动释放自己的资源。
在这里插入图片描述

服务端代码

#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <ctype.h>#define BACKLOG 13
#define SERVER_PORT 80
static int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0){printf("[ERROR]socket参数错误");return -1;}send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size;	//设置为32Kint nSendBuf=write_size;	//设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)  {printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));return -1;}return 0;
}static int socket_init(char *listen_ip,int listen_port)
{int listenfd;struct sockaddr_in servaddr;   if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0){printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));return -1;}printf("服务端创建TCPsocket[%d]成功\n",listenfd);if(set_sock_reuse(listenfd)<0)  {return -2;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(listen_port);if(!listen_ip){servaddr.sin_addr.s_addr=htonl(INADDR_ANY);}else                         { servaddr.sin_addr.s_addr=inet_addr(listen_ip);}  if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){             printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));return -2;}printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);listen(listenfd,BACKLOG);printf("开始监听端口[%d]\n",listen_port);return listenfd;
}static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{pthread_attr_t thread_attr;int rv = -1;if( pthread_attr_init(&thread_attr) )/*设置线程属性*/{printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));return -1;;}if( pthread_attr_setstacksize(&thread_attr, 120*1024) ){printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));return -2;}if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) ){printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));return -3;}if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/{printf("[ERROR]创建线程失败[%s]\n", strerror(errno));return -4;}return 0;
}  static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");while(1){memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);pthread_exit(NULL);}else if( rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);pthread_exit(NULL);}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}/*  convert letter from lowercase to uppercase */for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/{buf[i]=toupper(buf[i]);}rv=write(cli_fd, buf, rv);if(rv < 0){printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));close(cli_fd);pthread_exit(NULL);}}}int main(int argc,char **argv)
{int listen_fd;int clifd=-1;int rv=-2;int opt;struct sockaddr_in cliaddr;socklen_t cliaddr_len =sizeof(cliaddr) ;pthread_t tid;if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/{printf("socket初始化失败:%s",strerror(errno));return -2;}while(1){printf("开始等待新连接……\n");clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/if(clifd < 0){printf("连接新客户端失败: %s\n", strerror(errno));continue;}printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/}
}

这里客户端用了一个工具
在这里插入图片描述

[root@localhost test]# ./socket_pthread 
服务端创建TCPsocket[3]成功
socket[3]绑定端口[80]成功
开始监听端口[80]
开始等待新连接……
新客户端接入[192.168.31.2:5784]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5785]成功
开始等待新连接……
子线程开始通信
新客户端接入[192.168.31.2:5786]成功
开始等待新连接……
子线程开始通信
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
从客户端[4]读取[4]字节内容[2345]
从客户端[6]读取[4]字节内容[2345]
从客户端[5]读取[4]字节内容[2345]
客户端[6]连接结束 ,线程退出
客户端[5]连接结束 ,线程退出
客户端[4]连接结束 ,线程退出

线程池

都说线程池高级,那不妨用一下结合起来。
在这里插入图片描述


#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <ctype.h>#define BACKLOG 13
#define SERVER_PORT 80// 任务结构体
typedef struct Task
{void (*function)(void* arg);void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ;int queueCapacity;  // 容量int queueSize;      // 当前任务个数int queueFront;     // 队头 -> 取数据int queueRear;      // 队尾 -> 放数据pthread_t managerID;    // 管理者线程IDpthread_t *threadIDs;   // 工作的线程IDint minNum;             // 最小线程数量int maxNum;             // 最大线程数量int busyNum;            // 忙的线程的个数int liveNum;            // 存活的线程的个数int exitNum;            // 要销毁的线程个数pthread_mutex_t mutexPool;  // 锁整个的线程池pthread_cond_t notFull;     // 任务队列是不是满了pthread_cond_t notEmpty;    // 任务队列是不是空了pthread_mutex_t mutexBusy;  // 锁busyNum变量int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; ++i){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_mutex_lock(&pool->mutexPool);// 当前任务队列是否为空while (pool->queueSize == 0 && !pool->shutdown){// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 判断线程池是否被关闭了if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;// 解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);//free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown){// 每隔3s检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);const int NUMBER = 2;// 添加线程// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if (pool == NULL){printf("malloc threadpool fail...\n");break;}//申请max个phtread_t 线程idpool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL){printf("malloc threadIDs fail...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;		//在忙的线程pool->liveNum = min;    //和最小个数相等pool->exitNum = 0;		//需要销毁的//初始化锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or condition init fail...\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;	//当前任务数pool->queueFront = 0;	//队头pool->queueRear = 0;	//队尾pool->shutdown = 0;		//线程池启动销毁// 创建管理线程pthread_create(&pool->managerID, NULL, manager, pool);// 创建工作线程,先创建min个线程for (int i = 0; i < min; ++i){pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);// 释放资源if (pool && pool->threadIDs) free(pool->threadIDs);if (pool && pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutdown){// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}static int set_sock_time(int fd, int read_sec, int write_sec)
{struct timeval send_timeval;struct timeval recv_timeval;if(fd <= 0){printf("[ERROR]socket参数错误");return -1;}send_timeval.tv_sec = write_sec<0?0:write_sec;send_timeval.tv_usec = 0;recv_timeval.tv_sec = read_sec<0?0:read_sec;;recv_timeval.tv_usec = 0;if(setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &send_timeval, sizeof(send_timeval)) == -1){printf("[ERROR]socket配置发送超时失败[%s]",strerror(errno));return -1;}if(setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &recv_timeval, sizeof(recv_timeval)) == -1){printf("[ERROR]socket配置接收超时失败[%s]",strerror(errno));return -1;}return 0;}static int set_sock_bufsize(int fd,int read_size, int write_size)
{int nRecvBuf=read_size;	//设置为32Kint nSendBuf=write_size;	//设置为32K// 接收缓冲区if(setsockopt(fd,SOL_SOCKET,SO_RCVBUF,(const char*)&nRecvBuf,sizeof(int))== -1){printf("[ERROR]socket配置接收缓冲区大小失败[%s]",strerror(errno));return -1;}//发送缓冲区if(setsockopt(fd,SOL_SOCKET,SO_SNDBUF,(const char*)&nSendBuf,sizeof(int))== -1){printf("[ERROR]socket配置发送缓冲区大小失败[%s]",strerror(errno));return -1;}return 0;}static int set_sock_closelater(int fd,int onoff,int linger)
{struct linger linger_setvalue;linger_setvalue.l_onoff = onoff;linger_setvalue.l_linger = linger;if(setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *)&linger_setvalue, sizeof(linger_setvalue))== -1){printf("[ERROR]socket配置延迟关闭失败[%s]",strerror(errno));return -1;}return 0;
}static int set_sock_reuse(int fd)
{int on=1; if((setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)))<0)  {printf("[ERROR]socket配置重复使用失败[%s]",strerror(errno));return -1;}return 0;
}static int socket_init(char *listen_ip,int listen_port)
{int listenfd;struct sockaddr_in servaddr;   if((listenfd=socket(AF_INET,SOCK_STREAM,0))<0){printf("[ERROR]服务端创建TCPsocket失败[%s]\n",strerror(errno));return -1;}printf("服务端创建TCPsocket[%d]成功\n",listenfd);if(set_sock_reuse(listenfd)<0)  {return -2;}memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(listen_port);if(!listen_ip){servaddr.sin_addr.s_addr=htonl(INADDR_ANY);}else                         { servaddr.sin_addr.s_addr=inet_addr(listen_ip);}  if(bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){             printf("[ERROR]socket[%d]绑定端口[%d]失败[%s]\n",listenfd,listen_port,strerror(errno));return -2;}printf("socket[%d]绑定端口[%d]成功\n",listenfd,listen_port);listen(listenfd,BACKLOG);printf("开始监听端口[%d]\n",listen_port);return listenfd;
}static int thread_init(pthread_t *thread_id,void *(*start_routine) (void *),void *arg)/*start_route是一个函数指针,指向返回值为void *类型,参数也是void类型的函数*/
{pthread_attr_t thread_attr;int rv = -1;if( pthread_attr_init(&thread_attr) )/*设置线程属性*/{printf("[ERROR]设置线程属性失败[%s]\n", strerror(errno));return -1;;}if( pthread_attr_setstacksize(&thread_attr, 120*1024) ){printf("[ERROR]设置栈大小失败[%s]\n", strerror(errno));return -2;}if( pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED) ){printf("[ERROR]设置分离属性失败[%s]\n", strerror(errno));return -3;}if( pthread_create(thread_id, &thread_attr, start_routine, arg) )/*创建子线程并执行start_routine函数*/{printf("[ERROR]创建线程失败[%s]\n", strerror(errno));return -4;}return 0;
}  static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");while(1){memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);//pthread_exit(NULL);break;}else if(rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);//pthread_exit(NULL);break;}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}/*  convert letter from lowercase to uppercase */for(i=0; i<rv; i++)/*收到client发送的数据后把它转换成大写字母并发送给客户端*/{buf[i]=toupper(buf[i]);}rv=write(cli_fd, buf, rv);if(rv < 0){printf("[ERROR]客户端[%d]应答失败 ,线程退出\n", cli_fd,strerror(errno));close(cli_fd);//pthread_exit(NULL);break;}}}int main(int argc,char **argv)
{int listen_fd;int clifd=-1;int rv=-2;int opt;struct sockaddr_in cliaddr;socklen_t cliaddr_len =sizeof(cliaddr) ;pthread_t tid;// 创建线程池ThreadPool* pool = threadPoolCreate(3, 10, 100);if((listen_fd=socket_init(NULL,SERVER_PORT))<0)/*socket初始化返回一个listenfd*/{printf("socket初始化失败:%s",strerror(errno));return -2;}while(1){printf("开始等待新连接……\n");clifd=accept(listen_fd, (struct sockaddr *)&cliaddr, &cliaddr_len);/*主线程接收新的客户端的连接*/if(clifd < 0){printf("连接新客户端失败: %s\n", strerror(errno));continue;}printf("新客户端接入[%s:%d]成功\n", inet_ntoa(cliaddr.sin_addr),ntohs(cliaddr.sin_port));//thread_init(&tid, thread_worker, &clifd);/*创建子线程并让子线程和client进行数据的收发*/threadPoolAdd(pool, thread_worker, &clifd);}}

不过这种需要循环的客户端,立马就占满了3个处理线程,第四个加入的时候,就需要等待条件变量了。如果是那种短暂连接,发送个消息就结束的情况,应该就比较合适了
在这里插入图片描述

epoll+多线程

epoll可以高效处理socket连接,那么用来代替accept或者select自然是更好,配合线程池处理业务,来实现高并发快速处理

#include<stdio.h>
#include<stdlib.h>
#include<assert.h>
#include<unistd.h>
#include<string.h>
#include <errno.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include <pthread.h>
#include <ctype.h>#define MAX 1024
#define listen_port 80// 任务结构体
typedef struct Task
{void (*function)(void* arg);void* arg;
}Task;
// 线程池结构体
struct ThreadPool
{// 任务队列Task* taskQ;int queueCapacity;  // 容量int queueSize;      // 当前任务个数int queueFront;     // 队头 -> 取数据int queueRear;      // 队尾 -> 放数据pthread_t managerID;    // 管理者线程IDpthread_t *threadIDs;   // 工作的线程IDint minNum;             // 最小线程数量int maxNum;             // 最大线程数量int busyNum;            // 忙的线程的个数int liveNum;            // 存活的线程的个数int exitNum;            // 要销毁的线程个数pthread_mutex_t mutexPool;  // 锁整个的线程池pthread_cond_t notFull;     // 任务队列是不是满了pthread_cond_t notEmpty;    // 任务队列是不是空了pthread_mutex_t mutexBusy;  // 锁busyNum变量int shutdown;           // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
typedef struct ThreadPool ThreadPool;void threadExit(ThreadPool* pool)
{pthread_t tid = pthread_self();for (int i = 0; i < pool->maxNum; ++i){if (pool->threadIDs[i] == tid){pool->threadIDs[i] = 0;printf("threadExit() called, %ld exiting...\n", tid);break;}}pthread_exit(NULL);
}void* worker(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (1){pthread_mutex_lock(&pool->mutexPool);// 当前任务队列是否为空while (pool->queueSize == 0 && !pool->shutdown){// 阻塞工作线程pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);// 判断是不是要销毁线程if (pool->exitNum > 0){pool->exitNum--;if (pool->liveNum > pool->minNum){pool->liveNum--;pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}}}// 判断线程池是否被关闭了if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);threadExit(pool);}// 从任务队列中取出一个任务Task task;task.function = pool->taskQ[pool->queueFront].function;task.arg = pool->taskQ[pool->queueFront].arg;// 移动头结点pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;pool->queueSize--;// 解锁pthread_cond_signal(&pool->notFull);pthread_mutex_unlock(&pool->mutexPool);printf("thread %ld start working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum++;pthread_mutex_unlock(&pool->mutexBusy);task.function(task.arg);//free(task.arg);task.arg = NULL;printf("thread %ld end working...\n", pthread_self());pthread_mutex_lock(&pool->mutexBusy);pool->busyNum--;pthread_mutex_unlock(&pool->mutexBusy);}return NULL;
}void* manager(void* arg)
{ThreadPool* pool = (ThreadPool*)arg;while (!pool->shutdown){// 每隔3s检测一次sleep(3);// 取出线程池中任务的数量和当前线程的数量pthread_mutex_lock(&pool->mutexPool);int queueSize = pool->queueSize;int liveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);// 取出忙的线程的数量pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);const int NUMBER = 2;// 添加线程// 任务的个数>存活的线程个数 && 存活的线程数<最大线程数if (queueSize > liveNum && liveNum < pool->maxNum){pthread_mutex_lock(&pool->mutexPool);int counter = 0;for (int i = 0; i < pool->maxNum && counter < NUMBER&& pool->liveNum < pool->maxNum; ++i){if (pool->threadIDs[i] == 0){pthread_create(&pool->threadIDs[i], NULL, worker, pool);counter++;pool->liveNum++;}}pthread_mutex_unlock(&pool->mutexPool);}// 销毁线程// 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数if (busyNum * 2 < liveNum && liveNum > pool->minNum){pthread_mutex_lock(&pool->mutexPool);pool->exitNum = NUMBER;pthread_mutex_unlock(&pool->mutexPool);// 让工作的线程自杀for (int i = 0; i < NUMBER; ++i){pthread_cond_signal(&pool->notEmpty);}}}return NULL;
}//min 最少
//max 最多线程数
//queueSize 任务队列大小
ThreadPool* threadPoolCreate(int min, int max, int queueSize)
{ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));do {if (pool == NULL){printf("malloc threadpool fail...\n");break;}//申请max个phtread_t 线程idpool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);if (pool->threadIDs == NULL){printf("malloc threadIDs fail...\n");break;}memset(pool->threadIDs, 0, sizeof(pthread_t) * max);pool->minNum = min;pool->maxNum = max;pool->busyNum = 0;		//在忙的线程pool->liveNum = min;    //和最小个数相等pool->exitNum = 0;		//需要销毁的//初始化锁和条件变量if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||pthread_cond_init(&pool->notEmpty, NULL) != 0 ||pthread_cond_init(&pool->notFull, NULL) != 0){printf("mutex or condition init fail...\n");break;}// 任务队列pool->taskQ = (Task*)malloc(sizeof(Task) * queueSize);pool->queueCapacity = queueSize;pool->queueSize = 0;	//当前任务数pool->queueFront = 0;	//队头pool->queueRear = 0;	//队尾pool->shutdown = 0;		//线程池启动销毁// 创建管理线程pthread_create(&pool->managerID, NULL, manager, pool);// 创建工作线程,先创建min个线程for (int i = 0; i < min; ++i){pthread_create(&pool->threadIDs[i], NULL, worker, pool);}return pool;} while (0);// 释放资源if (pool && pool->threadIDs) free(pool->threadIDs);if (pool && pool->taskQ) free(pool->taskQ);if (pool) free(pool);return NULL;
}
int threadPoolDestroy(ThreadPool* pool)
{if (pool == NULL){return -1;}// 关闭线程池pool->shutdown = 1;// 阻塞回收管理者线程pthread_join(pool->managerID, NULL);// 唤醒阻塞的消费者线程for (int i = 0; i < pool->liveNum; ++i){pthread_cond_signal(&pool->notEmpty);}// 释放堆内存if (pool->taskQ){free(pool->taskQ);}if (pool->threadIDs){free(pool->threadIDs);}pthread_mutex_destroy(&pool->mutexPool);pthread_mutex_destroy(&pool->mutexBusy);pthread_cond_destroy(&pool->notEmpty);pthread_cond_destroy(&pool->notFull);free(pool);pool = NULL;return 0;
}void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg)
{pthread_mutex_lock(&pool->mutexPool);while (pool->queueSize == pool->queueCapacity && !pool->shutdown){// 阻塞生产者线程pthread_cond_wait(&pool->notFull, &pool->mutexPool);}if (pool->shutdown){pthread_mutex_unlock(&pool->mutexPool);return;}// 添加任务pool->taskQ[pool->queueRear].function = func;pool->taskQ[pool->queueRear].arg = arg;pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;pool->queueSize++;pthread_cond_signal(&pool->notEmpty);pthread_mutex_unlock(&pool->mutexPool);
}int threadPoolBusyNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexBusy);int busyNum = pool->busyNum;pthread_mutex_unlock(&pool->mutexBusy);return busyNum;
}int threadPoolAliveNum(ThreadPool* pool)
{pthread_mutex_lock(&pool->mutexPool);int aliveNum = pool->liveNum;pthread_mutex_unlock(&pool->mutexPool);return aliveNum;
}static void *thread_worker(void *ctx)/*子线程处理和客户端数据交换的处理函数*/
{int cli_fd;int rv;char buf[1024];int i;if( !ctx ){printf("[ERROR]客户端socket无效,线程退出\n");pthread_exit(NULL);}cli_fd = *(int *)ctx;/*强制类型转换成int *类型的并把ctx的值赋值给cli_fd*/printf("子线程开始通信\n");memset(buf, 0, sizeof(buf));rv=read(cli_fd, buf, sizeof(buf));if( rv < 0){printf("[ERROR]读取客户端[%d]数据失败[%s],线程退出\n",cli_fd,strerror(errno));close(cli_fd);}else if(rv == 0){printf("客户端[%d]连接结束 ,线程退出\n",cli_fd);close(cli_fd);}else if( rv > 0 ){printf("从客户端[%d]读取[%d]字节内容[%s]\n",cli_fd, rv, buf);}
}int main()
{int listenfd=socket(AF_INET,SOCK_STREAM,0); assert(listenfd!=-1);struct sockaddr_in ser,cli;ser.sin_family=AF_INET;ser.sin_port=htons(listen_port);ser.sin_addr.s_addr=htonl(INADDR_ANY);//inet_addr("127.0.0.1");int res=bind(listenfd,(struct sockaddr*)&ser,sizeof(ser));assert(res!=-1);listen(listenfd,5);int epfd=epoll_create(1);    //创建内核事件表epfdassert(epfd!=-1);struct epoll_event ev;    ev.events=EPOLLIN;ev.data.fd=listenfd;     //初始化一个关于listenfd的event结构体res=epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);  //将关于listenfd的结构体放入内核事件表assert(res!=-1);struct epoll_event event[MAX];    //下面epoll_wait()要将就绪事件都放入该数组中返回回来// 创建线程池ThreadPool* pool = threadPoolCreate(10, 100, 100);while(1){int n=epoll_wait(epfd,event,MAX,-1);   //核心函数;返回就绪文件描述符个数if(n==-1)   {printf("error!\n");exit(0);}if(n==0){printf("timeout\n");continue;}int i=0;for(;i<n;++i){int fd=event[i].data.fd;if(event[i].events & EPOLLRDHUP)   //cli输入“end”{printf("break\n");close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);   //将关于fd的结构体从epfd中删除continue;}if(event[i].events & EPOLLIN)  {if(fd==listenfd){int len=sizeof(cli);int c=accept(listenfd,(struct sockaddr*)&cli,&len);assert(c!=-1);printf("link succese\n");ev.events= EPOLLIN|EPOLLET;//EPOLLIN|EPOLLRDHUP;ev.data.fd=c;res=epoll_ctl(epfd,EPOLL_CTL_ADD,c,&ev);assert(res!=-1);}else{threadPoolAdd(pool, thread_worker, &fd);}}}}
}

结束语

今天是国庆前最后一天
在这里插入图片描述

等了很久的放假,突然感觉也没那么有意思,回家也回不去,总是临近节假日,就开始有疫情。
虽然好多资本主义国家都宣布疫情结束了,他们一定是在骗自己,
在这里插入图片描述
再工作一会吧,站好最后一班岗
在这里插入图片描述

http://www.lryc.cn/news/2421106.html

相关文章:

  • 网维服务器加硬盘,网维大师官网-帮助
  • 裸奔浏览器_谁动了我的浏览器主页?“技术霸凌”带来糟心事
  • WPA2破解教程(详细步骤)
  • 雷军,早已财富自由的人,依然在努力!
  • NoSQL数据库Redis--2
  • 天蓝色房间(密室逃脱三)攻略
  • Unity实现瞄准镜效果之美
  • PC装MAC OS 10.6雪豹系统教程
  • windows 执行CMD命令
  • Java爬虫实践:Jsoup+HttpUnit爬取今日头条、网易、搜狐、凤凰新闻
  • 开发WAP网站入门
  • 2021-03-01
  • .NET Framework 3.5 SP1 离线安装时下载文件问题及精简方法
  • chinaren同学录的字数倒记数
  • as3.0舞台自适应
  • 虚拟光驱DAEMONTools 3.47特别版/汇编版/珍藏版
  • flask中jinja2设置使用全局变量
  • iPad2 4.3.3完美越狱教程
  • 中间继电器
  • 2008年卫星地图_黄河入海口1984年-2016年,34年卫星地图变化
  • 【转】怎样将APP或PXL转为IPA格式
  • 天河一号超算集群编译安装OpenFOAM5.x
  • SSM博雅学校教务管理系统 毕业设计-附源码86203
  • VirtualBox 详细介绍:核心概念、安装步骤及高级用法
  • 【win】Beyondcompared 3 试用过期 修改注册表
  • 好用的android高清播放器,高清播放器哪个好用(推荐4款4K高清画质播放器)
  • 说透Applet的数字签名之1——Applet及其运行
  • HTC T328W 刷机后,WLAN、蓝牙无法使用的解决方法[亲测有效]
  • TCP/IP Monitor监控工具应用
  • Learning to See in the Dark