当前位置: 首页 > news >正文

Linux c网络专栏第四章io_uring

1.初识io_uring

(1)io_uring是什么?

从名字上解读,是一个提供给用户处理IO的环形队列,实现用户程序提交完IO请求后,能执行其他代码,不会被阻塞。让内核来异步(线程池)执行真正的IO操作,当IO操作完成,内核会将操作结果存放在完成环形队列,通过如epoll的通知机制让用户程序直接取出IO操作的结果。

(2)能异步的对象有?

不仅包括传统的文件 I/O 操作(如 read、write、open、close 等),还支持网络相关的系统调用,如 send、recv、accept、connect 等,所以可以通过io_uring构建高并发高性能的网络服务器,例如,在一个文件服务器中,当有多个客户端同时请求读取文件时,使用 io_uring 可以让服务器在处理一个客户端的 I/O 请求时,同时处理其他客户端的请求,而不需要等待每个 I/O 请求都完成后再处理下一个,大大提高了服务器的响应速度和吞吐量。

(3)环形队列的机制是?

在初始化 io_uring 时,通过内存映射(mmap)机制,将提交队列(submit queue)和完成队列(complete queue)映射到用户空间和内核空间,从而避免了数据传递需要的系统调用和上下文切换。

(4)io_uring的三个系统调用的作用分别是?

⑴io_uring_setup

接受两个参数,第一个参数是期望的提交队列(SQ)的大小,即队列中可以容纳的 I/O 请求数量;第二个参数是一个结构体指针,该结构体用于返回 io_uring 实例的相关参数io_uring_params,如实际分配的 SQ 和完成队列(CQ)的大小、队列的偏移量等信息。

工作一:为 io_uring 实例分配所需的内存空间,包括 SQ、CQ 以及相关的控制结构。

工作二:返回一个fd,用于操作初始化的这个操作该 io_uring 实例。

⑵io_uring_enter

主要作用是将应用程序准备好的 I/O 请求提交给内核,并可以根据传入的参数选择 不等待直接返回 / 等待至少n个IO有结果返回 /等待全部有结果返回。

⑶io_uring_register

用于注册文件描述符或事件文件描述符到 io_uring 实例中。这样内核在IO处理的时候,可以直接访问这些预先注册的资源,而无需每次都重新设置相关信息,从而提高了 I/O 操作的效率。例如,在进行大量文件读写操作时,预先注册文件描述符可以避免每次提交 I/O 请求时都进行文件描述符的查找和验证。

(5)io_uring的应用场景有?

高性能网络服务器,无需阻塞等待操作完成,可以立即处理其他请求,从而提高了并发处理能力。

数据库系统、存储服务系统等需要频繁地进行读写操作的业务。

(6)submit queue的结点和compelete queue的结点是什么关系?

实际上是同一个结点。但是情况 compelete queue不会影响submit queue队列中的结点。因为不是删除。

(7)io_uring和epoll有哪些不同?

1)io_uring每处理完一个事件之后,需要再通过prepare再设置事件进submit队列,否则用户操作只能有一次。而epoll是持续监听fd的,不需要多次设置事件。

2)epoll 通过内核和用户空间的数据拷贝来传递事件信息,而 io_uring 基于两个共享环形缓冲区,内核和用户可以共同操作

3)epoll主要用于IO多路复用,给出的是可以操作;io_uring不仅用于事件通知,还能直接向IO操作,直接给出结果。

4)难度不同,因为io_uring在每处理完一个事件后需要思考设置新事件,因此逻辑上更困难,而epoll相对简单。

(8)如果有多个客户端同时连接uring_tcp_server如何应对

在监听的sockfd上投递多个accept请求,要并发10个就投递10个。

2.用io_uring实现一个tcp_server

#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define EVENT_ACCEPT   	0
#define EVENT_READ		1
#define EVENT_WRITE		2struct conn_info { //用于关联 连接 与 结点数据int fd; //方便知道compelete queue中取出的结点数据 对应是哪一个连接int event; //方便知道是哪一个事件 进行判断
};int init_server(unsigned short port) {	int sockfd = socket(AF_INET, SOCK_STREAM, 0);	struct sockaddr_in serveraddr;	memset(&serveraddr, 0, sizeof(struct sockaddr_in));	serveraddr.sin_family = AF_INET;	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);	serveraddr.sin_port = htons(port);	if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {		perror("bind");		return -1;	}	listen(sockfd, 10);return sockfd;
}#define ENTRIES_LENGTH		1024
#define BUFFER_LENGTH		1024int set_event_recv(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);   //创建结点struct conn_info accept_info = {.fd = sockfd,.event = EVENT_READ, };io_uring_prep_recv(sqe, sockfd, buf, len, flags); //加入submit queue 内部有io_uring_registermemcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info)); //给结点绑定信息}int set_event_send(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_WRITE,};io_uring_prep_send(sqe, sockfd, buf, len, flags); //提交到submitqueuememcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_ACCEPT,};io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int main(int argc, char *argv[]) {unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(&params, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);//初始化两个队列  内部有io_uring_setup系统调用struct sockaddr_in clientaddr;	socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);char buffer[BUFFER_LENGTH] = {0};while (1) {io_uring_submit(&ring); //提交submit queue的结点到内核  内部有io_uring_enterstruct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe); //等待compelete queue有结点 并且拿到compelete queue的头指针 cqestruct io_uring_cqe *cqes[128]; //指向从compelete queue中取出的结点int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); //从compelete queue中取出结点  epoll_waitint i = 0;for (i = 0;i < nready;i ++) {struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT) { //并不是靠事件类型驱动 这只是普通判断分三种情况set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0); //避免连接断开后无法再连接  因为最后会清空compelete queue//  不像epoll持续监听  所以需要再重新加入submit队列int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);//避免用户连接后发送数据接收不到} else if (result.event == EVENT_READ) {  int ret = entries->res;//printf("set_event_recv ret: %d, %s\n", ret, buffer); //if (ret == 0) { //避免断开连接后fd = 0 也设置事件一直循环close(result.fd);} else if (ret > 0) {set_event_send(&ring, result.fd, buffer, ret, 0); //收到后设置发送事件}}  else if (result.event == EVENT_WRITE) {//int ret = entries->res;//printf("set_event_send ret: %d, %s\n", ret, buffer);set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);//发送后再设置回接收事件 用户则可以不停发送}}io_uring_cq_advance(&ring, nready); //清空compelete queue}}

编程细节:

1.关联compelete queue结点数据到具体的连接。

2.连接断开服务端如何感应。参考Posix API文章recv()的具体工作。

3.Accept事件可以多次投递,但是recv事件只能等取出完成队列的结果数据后再投递(多个recv事件非阻塞会竞争rbuffer,导致数据覆盖或者乱序),send可以多次投递。

3.测试epoll与uring的性能

编写一个client_test,作为客户端代码,测试与两个服务器分别发送100w个包,全部接收返回需要的时间,以及qps(服务器 ​每秒能处理的请求数量),包的大小分别是128b、256b、512b。

client_test代码:

#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>typedef struct test_info_t {char serverip[16];int port;int threadnum;int connectnum;int requestnum;int falied;
} test_info;#define MESSAGE "12345\r\n"   //8B (隐含\0终止符)
#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)
#define pkt_size 256  //512 1024 2048
int send_pkt(int fd) {char wbuffer[2048] = {0}; //2KBint i=0;for(i=0;i<pkt_size / strlen(MESSAGE);i++){strcpy(wbuffer + i*strlen(MESSAGE),MESSAGE);}int ret = send(fd, wbuffer, strlen(wbuffer), 0 );if (ret < 0) {exit(1);}return 0;
}int recv_pkt(int fd) {char rbuffer[2048] = {0}; //2KBchar tmp[2048] = {0};int total_size = 0;int recv_size;while((recv_size = recv(fd, tmp, 2048, 0)) > 0){ //当包大小超过1500B的时候 分批接收memcpy(rbuffer + total_size,tmp,recv_size); //不用strcopy 不依赖\0total_size += recv_size;memset(tmp,0,2048);}if (strcmp(rbuffer, wbuffer)) {printf("recv : '%s' != send : '%s'\n", buffer, MESSAGE);return -1;}return 0;
}int connect_tcpserver(const char *ip, unsigned short port) {int connfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in server_addr;memset(&server_addr, 0, sizeof(struct sockaddr_in));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = inet_addr(ip);server_addr.sin_port = htons(port);int ret = connect(connfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr_in));if (ret) {perror("connect error\n");return -1;}return connfd;
}static void *thread_callback(void *arg) {test_info info = *(test_info *)arg;int cnt = info.requestnum / info.threadnum;int connfd = connect_tcpserver(info.serverip, info.port);if (connfd < 0) {printf("connect fail\n");info.falied += cnt;return NULL;}int i = 0;for (i = 0; i < cnt; i++) {int res = send_pkt(connfd);if (res) {printf("send fail\n");info.falied++;continue;}res = recv_pkt(connfd);if (res) {printf("recv fail\n");info.falied++;continue;}}return NULL;
}//./client_test -s 192.168.88.9 -p 2048 -t 50 -c 50 -n 1000000
//服务端ip和port  客户端线程数量  连接数量  发送数据包数量
int main(int argc, char *argv[]) {int op;test_info info = {0};while ((op = getopt(argc, argv, "s:p:t:c:n:?")) != -1) {switch (op) {case 's':strcpy(info.serverip, optarg);break;case 'p':info.port = atoi(optarg);break;case 't':info.threadnum = atoi(optarg);break;case 'c':info.connectnum = atoi(optarg);break;case 'n':info.requestnum = atoi(optarg);break;default :return -1;}}struct timeval tv_begin;gettimeofday(&tv_begin, NULL);pthread_t *tid = malloc(info.threadnum * sizeof(pthread_t));int i;for (i = 0; i < info.threadnum; i++) {pthread_create(&tid[i], NULL, thread_callback, &info);}for (i = 0; i < info.threadnum; i++) {pthread_join(tid[i], NULL); //主线程等待子线程全部返回  并且会回收资源 无需free}struct timeval tv_end;gettimeofday(&tv_end, NULL);int time_used = TIME_SUB_MS(tv_end, tv_begin);printf("success: %d, failed: %d, time_used:%d, qps: %d\n", info.requestnum - info.falied, info.falied, time_used,info.requestnum /(time_used / 1000));return 0;
}

性能测试结果:

(1)

耗时相差1s,qps相差1.5w

(2)

耗时相差1.6s,qps相差2.2w

4.网络面试题

1.为什么tcp握手是三次,挥手是四次?

握手:如果握手是两次的话,B一收到A的请求就会进入established状态。可能会有一种情况是A发送的第一个请求被网络阻塞了,随后A又发了第二个请求,并且成功收到了B的ACK,两个人开始通信,通信后关闭连接。但是这时第一个被阻塞的请求到达了B,B进入连接状态等待A发送消息,但是A是close状态。

挥手:如果挥手是三次的话,在A收到B的释放请求后,直接就结束,那B会不知道自己的请求是否被收到,误认为对方还在等自己的释放请求,实际上A已经close了。所以需要对B再发送一次释放请求的确认,保证B到A方向的关闭。

2.udp的并发如何做?

3.tcp和udp的区别

1.前置不同:tcp基于连接,udp基于数据报

2.分包与黏包的解决方案

tcp可以采用在应用层头部加上两个字节记录包大小,比如要发送的buffer前两个数组空间用于标记长度,对方接收先收两字节,再收后面的。或者分隔符,在每固定大小的后面加上/r/n类似的分隔符,让接收方在接收后判断。

udp需要在代码层对每个包进行id标识,并且在代码层实现确认机制

3.并发的做法不一样。udp需要在代码层模拟tcp的操作实现并发,为每一个用户建立一个连接,服务器和用户通过连接并发交互。而tcp原生的accept就能接收多个连接然后epoll管理。

4.业务场景不同。udp可以做游戏的团战或者下载工具,udp必须在实时性和传输量上做出选择,实时性 = 频繁发送小包 (游戏), 传输量 = 间隔发送大包(下载)。tcp适合做http需要顺序传输或者Websocket需要长连接的业务。

5.udp适合简单交互的业务(短“连接”),比如DNS,请求一次后返回就结束了。tcp适合做长连接,存储状态。

文章参考:

(31 封私信 / 80 条消息) 深入解剖io_uring:Linux异步IO的终极武器 - 知乎

代码参考:

https://github.com/0voice

http://www.lryc.cn/news/605238.html

相关文章:

  • Linux零基础Shell教学全集(可用于日常查询语句,目录清晰,内容详细)(自学尚硅谷B站shell课程后的万字学习笔记,附课程链接)
  • Baumer工业相机堡盟工业相机如何通过YoloV8的深度学习模型实现汽车牌照的位置识别(C#代码,UI界面版)
  • 大厂主力双塔模型实践与线上服务
  • SSRF漏洞基础
  • 爬虫验证码处理:ddddocr 的详细使用(通用验证码识别OCR pypi版)
  • Redis 中 key 的过期策略 和 定时器的两种实现方式
  • cocos打包web端需要注意的地方
  • Apache HTTP Server 2.4.50 路径穿越漏洞(CVE-2021-42013)
  • Baumer工业相机堡盟工业相机如何通过YoloV8深度学习模型实现裂缝的检测识别(C#代码UI界面版)
  • 生成式推荐网络架构汇总
  • Java注解与反射:从自定义注解到框架设计原理
  • CHI - Transaction介绍(4) - 原子操作
  • 工厂方法模式:从基础到C++实现
  • Spring Boot 数据源配置中为什么可以不用写 driver-class-name
  • 1. ESP开发之实体按键(KEYPADBUTTON)控制LVGL控件
  • 一文掌握最新版本Monocle3单细胞轨迹(拟时序)分析
  • 【Unity】在构建好的项目里创建自定义文件夹
  • Thales靶机
  • Redis知识点(1)
  • 【力扣热题100】哈希——字母异位词分组
  • 【c++】leetcode763 划分字母区间
  • LeetCode热题100--148. 排序链表--中等
  • 限流算法详解:固定窗口、滑动窗口、令牌桶与漏桶算法全面对比
  • 力扣-543.二叉树的直径
  • 【LeetCode】链表反转实现与测试
  • (补题)小塔的饭
  • sqLite 数据库 (3):以编程方式使用 sqLite,4 个函数,以及 sqLite 移植,合并编译
  • linux 执行sh脚本,提示$‘\r‘: command not found
  • C语言:函数指针、二级指针、常量指针常量、野指针
  • 【Kubernetes 指南】基础入门——Kubernetes 201(二)