Rust Async 异步编程(五):执行器和系统 I/O
Rust Async 异步编程(五):执行器和系统 I/O
- Rust Async 异步编程(五):执行器和系统 I/O
Rust Async 异步编程(五):执行器和系统 I/O
前面我们一起看过一个使用 Future 从 Socket 中异步读取数据的例子:
pub struct SocketRead<'a> {socket: &'a Socket,
}impl SimpleFuture for SocketRead<'_> {type Output = Vec<u8>;fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {if self.socket.has_data_to_read() {// socket 有数据,写入 buffer 中并返回Poll::Ready(self.socket.read_buf())} else { // socket 中还没数据// 注册一个 wake 函数,当数据可用时,该函数会被调用,// 然后当前 Future 的执行器会再次调用 poll 方法,此时就可以读取到数据self.socket.set_readable_callback(wake);Poll::Pending}}
}
该例子中,Future 将从 Socket 读取数据,若当前还没有数据,则会让出当前线程的所有权,允许执行器去执行其它的 Future。
当数据准备好后,会调用 wake() 函数将该 Future 的任务放入任务通道中,等待执行器的 poll。
关于该流程已经反复讲了很多次,相信大家应该非常清楚了。然而该例子中还有一个疑问没有解决:set_readable_callback 方法到底是怎么工作的?怎么才能知道 socket 中的数据已经可以被读取了?
在现实世界中,该问题往往是通过操作系统提供的 I/O 多路复用机制来完成,例如 Linux 中的 epoll,FreeBSD 和 macOS 中的 kqueue,Windows 中的 IOCP,Fuchisa中的 ports 等(可以通过 Rust 的跨平台包 mio 来使用它们)。借助 I/O 多路复用机制,可以实现一个线程同时阻塞地去等待多个异步 I/O 事件,一旦某个事件完成就立即退出阻塞并返回数据。
相关实现类似于以下代码:
struct IoBlocker { /* ... */ }struct Event {// Event 的唯一 ID,该事件发生后,就会被监听起来id: usize,// 一组需要等待或者已发生的信号signals: Signals,
}impl IoBlocker {/// 创建需要阻塞等待的异步 I/O 事件的集合fn new() -> Self { /* ... */ }/// 对指定的 I/O 事件表示兴趣fn add_io_event_interest(&self,/// 事件所绑定的 socketio_object: &IoObject,event: Event,) { /* ... */ }/// 进入阻塞,直到某个事件出现fn block(&self) -> Event { /* ... */ }
}let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(&socket_1,Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(&socket_2,Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();// 当 socket 的数据可以读取时,打印 "Socket 1 is now READABLE"
println!("Socket {:?} is now {:?}", event.id, event.signals);
Future 执行者可以使用这些 I/O 多路复用机制提供异步 I/O 对象,例如 socket,它就可以当特定 IO 事件发生时通过配置回调来运行
针对我们 SocketRead 例子,Socket::set_readable_callback 的伪代码大致如下:
impl Socket {fn set_readable_callback(&self, waker: Waker) {let local_executor = self.local_executor;let id = self.id;local_executor.event_map.insert(id, waker);local_executor.add_io_event_interest(&self.socket_file_descriptor,Event { id, signals: READABLE },);}
}
现在,我们就只有一个执行者线程,它可以接收 I/O 事件,并将它们分配到适合的 Waker,这将唤醒相应的任务,并允许执行者在返回检查更多的 I/O 事件之前,驱动更多的任务完成(循环继续…)。
这样,我们只需要一个执行器线程,它会接收 I/O 事件并将其分发到对应的 Waker 中,接着后者会唤醒相关的任务,最终通过执行器 poll 后,任务可以顺利的继续执行,这种 I/O 读取流程可以不停的循环,直到 socket 关闭。
参考:
- https://github.com/rustcn-org/async-book
- https://www.bilibili.com/video/BV1Ki4y1C7gj