当前位置: 首页 > news >正文

Rust Async 异步编程(五):执行器和系统 I/O

Rust Async 异步编程(五):执行器和系统 I/O

  • Rust Async 异步编程(五):执行器和系统 I/O

Rust Async 异步编程(五):执行器和系统 I/O

前面我们一起看过一个使用 Future 从 Socket 中异步读取数据的例子:

pub struct SocketRead<'a> {socket: &'a Socket,
}impl SimpleFuture for SocketRead<'_> {type Output = Vec<u8>;fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if self.socket.has_data_to_read() {// socket 有数据,写入 buffer 中并返回Poll::Ready(self.socket.read_buf())} else { // socket 中还没数据// 注册一个 wake 函数,当数据可用时,该函数会被调用,// 然后当前 Future 的执行器会再次调用 poll 方法,此时就可以读取到数据self.socket.set_readable_callback(wake);Poll::Pending}}
}

该例子中,Future 将从 Socket 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 Future。

当数据准备好后,会调用 wake() 函数将该 Future 的任务放入任务通道中,等待执行器的 poll。

关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:set_readable_callback 方法到底是怎么工作的?怎么才能知道 socket 中的数据已经可以被读取了?

在现实世界中,该问题往往是通过操作系统提供的 I/O 多路复用机制来完成,例如 Linux 中的 epoll,FreeBSD 和 macOS 中的 kqueue,Windows 中的 IOCP,Fuchisa中的 ports 等(可以通过 Rust 的跨平台包 mio 来使用它们)。借助 I/O 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 I/O 事件,一旦某个事件完成就立即退出阻塞并返回数据。

相关实现类似于以下代码:

struct IoBlocker { /* ... */ }struct Event {// Event 的唯一 ID,该事件发生后,就会被监听起来id: usize,// 一组需要等待或者已发生的信号signals: Signals,
}impl IoBlocker {/// 创建需要阻塞等待的异步 I/O 事件的集合fn new() -> Self { /* ... */ }/// 对指定的 I/O 事件表示兴趣fn add_io_event_interest(&self,/// 事件所绑定的 socketio_object: &IoObject,event: Event,) { /* ... */ }/// 进入阻塞,直到某个事件出现fn block(&self) -> Event { /* ... */ }
}let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(&socket_1,Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(&socket_2,Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();// 当 socket 的数据可以读取时,打印 "Socket 1 is now READABLE" 
println!("Socket {:?} is now {:?}", event.id, event.signals);

Future 执行者可以使用这些 I/O 多路复用机制提供异步 I/O 对象,例如 socket,它就可以当特定 IO 事件发生时通过配置回调来运行

针对我们 SocketRead 例子,Socket::set_readable_callback 的伪代码大致如下:

impl Socket {fn set_readable_callback(&self, waker: Waker) {let local_executor = self.local_executor;let id = self.id;local_executor.event_map.insert(id, waker);local_executor.add_io_event_interest(&self.socket_file_descriptor,Event { id, signals: READABLE },);}
}

现在,我们就只有一个执行者线程,它可以接收 I/O 事件,并将它们分配到适合的 Waker,这将唤醒相应的任务,并允许执行者在返回检查更多的 I/O 事件之前,驱动更多的任务完成(循环继续…)。

这样,我们只需要一个执行器线程,它会接收 I/O 事件并将其分发到对应的 Waker 中,接着后者会唤醒相关的任务,最终通过执行器 poll 后,任务可以顺利的继续执行,这种 I/O 读取流程可以不停的循环,直到 socket 关闭。

参考:

  1. https://github.com/rustcn-org/async-book
  2. https://www.bilibili.com/video/BV1Ki4y1C7gj
http://www.lryc.cn/news/624166.html

相关文章:

  • Python可视化工具-Bokeh:动态显示数据
  • java_spring boot 中使用 log4j2 及 自定义layout设置示例
  • 【Java后端】MyBatis-Plus 原理解析
  • 股票术语:“支撑位”
  • 链表OJ题讲解---试金石含金量
  • qt svg缺失元素, 原因是不支持 rgba
  • 测试Windows10IoT系统是否可以正常运行KingSCSDA3.8软件
  • JavaScirpt高级程序设计第三版学习查漏补缺(1)
  • JavaScript 中constructor 属性的指向异常问题
  • 【前端面试题】JavaScript核心面试题解析
  • 芋道RBAC实现介绍
  • 软件开发 - foreground 与 background
  • 数据结构与算法之 leetcode 98. 验证二叉搜索树 (前序,中序,后序遍历)
  • React 基础实战:从组件到案例全解析
  • Wasserstein GAN:如何解决GANS训练崩溃,深入浅出数学原理级讲解WGAN与WGAN-GP
  • C语言相关简单数据结构:双向链表
  • 【数据分享】黑龙江省黑土区富锦市土地利用数据
  • 正则表达式实用面试题与代码解析专栏
  • 【Linux系列】常见查看服务器 IP 的方法
  • 如何解决pip安装报错ModuleNotFoundError: No module named ‘imageio’问题
  • Go语言企业级权限管理系统设计与实现
  • 2024年08月13日 Go生态洞察:Go 1.23 发布与全面深度解读
  • pandas series常用函数
  • leetcode热题100——day33
  • Python 内置模块 collections 常用工具
  • (机器学习)监督学习 vs 非监督学习
  • 二分查找(Binary Search)
  • 机器学习算法篇(十三)------词向量转化的算法思想详解与基于词向量转换的文本数据处理的好评差评分类实战(NPL基础实战)
  • 第七十九:AI的“急诊科医生”:模型失效(Loss Explode)的排查技巧——从“炸弹”到“稳定”的训练之路!
  • Tomcat下载、安装及配置详细教程