当前位置: 首页 > news >正文

【Rust异步】async和await异步编程实战:高并发任务处理全解析

在这里插入图片描述

✨✨ 欢迎大家来到景天科技苑✨✨

🎈🎈 养成好习惯,先赞后看哦~🎈🎈

🏆 作者简介:景天科技苑
🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。
🏆《博客》:Rust开发,Python全栈,Golang开发,云原生开发,PyQt5和Tkinter桌面开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi,flask等框架,云原生K8S,linux,shell脚本等实操经验,网站搭建,数据库等分享。

所属的专栏:Rust高性能并发编程
景天的主页:景天科技苑

在这里插入图片描述

文章目录

  • Rust异步编程
    • 一、Rust 中的异步编程模型
    • 二、async/await 语法和用法
    • 三、Tokio运行时
      • 3.1 异步运行时
      • 3.2 同步原语
        • 3.2.1 Mutex
        • 3.2.2 RwLock
        • 3.2.3 Barrier
        • 3.2.4 Notify
        • 3.2.5 Semaphore
        • 3.2.6 OnceCell
      • 3.3 通道Channel
        • 3.3.1 mpsc
        • 3.3.2 oneshot
        • 3.3.4 broadcast
        • 3.3.5 watch (spmc)
      • 3.4 tokio时间相关
        • 3.4.1 Sleep和Sleep_until
        • 3.4.2 Timeout
        • 3.4.3 Interval
    • 四、futures库
      • 4.1 futures库的历史和生态地位
      • 4.2 Future Trait详解
        • 4.2.1 Future的定义
        • 4.2.2 poll的状态机原理
      • 4.3 Future的状态机原理
      • 4.4 Future的poll方法和Pin/Unpin
        • 4.4.1 为什么需要Pin
        • 4.4.2 Unpin的意义
      • 4.5 futures库的核心Trait
        • 4.5.1 Future Trait
        • 4.5.2 Stream Trait
        • 4.5.3 Sink Trait
        • 4.5.4 AsyncRead/AsyncWrite Trait
      • 4.6 Future的创建
        • 4.6.1 async/await生成Future
        • 4.6.2 手动实现Future
        • 4.6.3 futures::future模块
      • 4.7 Future的组合子(combinator)详解
        • 4.7.1 then、map、and_then
        • 4.7.2 join, join_all, try_join
        • 4.7.3 select
      • 4.8 Future的运行与执行器
        • 4.8.1 手动驱动Future
        • 4.8.2 futures库的执行器
      • 4.9 futures::Stream详解与常用操作
        • 4.9.1 Stream的创建
        • 4.9.2 异步流的处理
        • 4.9.3 Stream与Future互转
      • 4.10 Stream与Sink的协作
      • 4.11 通道(Channel)与异步消息通信
      • 4.12 Future与同步世界的桥梁
      • 4.13 Future取消、超时与select
        • 4.13.1 select选择
        • 4.13.2 超时
        • 4.13.3 取消
      • 4.14 错误处理与Result、Option结合

Rust异步编程

异步编程是一种并发编程模型,通过在任务执行期间不阻塞线程的方式,提高系统的并发能力和响应性。
相比于传统的同步编程,异步编程可以更好地处理 I/O 密集型任务和并发请求,提高系统的吞吐量和性能。
异步编程具有以下优势:
• 提高系统的并发能力和响应速度
• 减少线程等待时间,提高资源利用率
• 可以处理大量的并发请求或任务
• 支持高效的事件驱动编程风格

异步编程广泛应用于以下场景:
• 网络编程:处理大量的并发网络请求
• I/O 密集型任务:如文件操作、数据库访问等
• 用户界面和图形渲染:保持用户界面的流畅响应
• 并行计算:加速复杂计算任务的执行

一、Rust 中的异步编程模型

Rust 作为一门现代的系统级编程语言,旨在提供高效、安全和可靠的异步编程能力。
Rust 异步编程的目标是实现高性能、无安全漏洞的异步应用程序,同时提供简洁的语法和丰富的异步库。

由于并发编程在现代社会非常重要,因此每个主流语言都对自己的并发模型进行过权衡取舍和精心设计,Rust 语言也不例外。
下面的列表可以帮助大家理解不同并发模型的取舍:
• OS 线程, 它最简单,也无需改变任何编程模型 (业务/代码逻辑),因此非常适合作为语言的原生并发模型,
我们在多线程章节也提到过,Rust 就选择了原生支持线程级的并发编程。
但是,这种模型也有缺点,例如线程间的同步将变得更加困难,线程间的上下文切换损耗较大。
使用线程池在一定程度上可以提升性能,但是对于 IO 密集的场景来说,线程池还是不够看。

• 事件驱动 (Event driven), 这个名词你可能比较陌生,如果说事件驱动常常跟回调 ( Callback ) 一起使用,相信大家就恍然大悟了。
这种模型性能相当的好,但最大的问题就是存在回调地狱的风险:非线性的控制流和结果处理导致了数据流向和错误传播变得难以掌控,
还会导致代码可维护性和可读性的大幅降低,大名鼎鼎的 JS 曾经就存在回调地狱。

• 协程 (Coroutines) 可能是目前最火的并发模型,Go 语言的协程设计就非常优秀,这也是 Go 语言能够迅速火遍全球的杀手锏之一。
协程跟线程类似,无需改变编程模型,同时,它也跟 async 类似,可以支持大量的任务并发运行。
但协程抽象层次过高,导致用户无法接触到底层的细节,这对于系统编程语言和自定义异步运行时是难以接受的

• actor 模型 是 erlang 的杀手锏之一,它将所有并发计算分割成一个一个单元,这些单元被称为 actor ,
单元之间通过消息传递的方式进行通信和数据传递,跟分布式系统的设计理念非常相像。
由于 actor 模型跟现实很贴近,因此它相对来说更容易实现,但是一旦遇到流控制、失败重试等场景时,就会变得不太好用

• async/await,该模型性能高,还能支持底层编程,同时又像线程和协程那样无需过多的改变编程模型,
但有得必有失,async 模型的问题就是内部实现机制过于复杂,对于用户来说,理解和使用起来也没有线程和协程简单,
好在前者的复杂性开发者们已经帮我们封装好,而理解和使用起来不够简单,正是本文试图解决的问题。

总之,Rust 经过权衡取舍后,最终选择了同时提供多线程编程和 async 编程:
• 前者通过标准库实现,当你无需那么高的并发时,例如需要并行计算时,可以选择它,
优点是线程内的代码执行效率更高、实现更直观更简单,这块内容已经在多线程章节进行过深入讲解,不再赘述
• 后者通过语言特性 + 标准库 + 三方库的方式实现,在你需要高并发、异步 I/O 时,选择它就对了

异步运行时是 Rust 中支持异步编程的运行时环境,负责管理异步任务的执行和调度。
它提供了任务队列、线程池和事件循环等基础设施,支持异步任务的并发执行和事件驱动的编程模型。
Rust 没有内置异步调用所必须的运行时, 主要的 Rust 异步运行时包括:

• Tokio - Rust 异步运行时的首选, 拥有强大的性能和生态系统。Tokio 提供异步TCP/UDP 套接字、线程池、定时器等功能。
• async-std - 较新但功能完善的运行时, 提供与 Tokio 类似的异步抽象。代码较简洁, 易于上手。
• smol - 一个轻量级的运行时, 侧重 simplicity(简单性)、ergonomics(易用性) 和小巧。
• futures/futures-lite

还有 futuresust 异步编程的基础抽象库。大多数运行时都依赖 futures 提供异步原语。
今日头条是国内使用 Rust 语言的知名公司之一,他们也开源了一个他们的运行时bytedance/monoio
Rust 异步编程模型包含了一些关键的组件和概念,包括:
• 异步函数和异步块:使用 async 关键字定义的异步函数和异步代码块。

// foo()函数经过async修饰,返回一个Future<Output= u8>
// 当调用foo().await时,该Future将被运行,当调用结束后我们将获取一个u8值
async fn foo() -> u8 {5
}
fn bar() -> impl Future<Output = u8> {// 下面的async {}块返回一个 Future<Output = u8>async {let x: u8 = foo().await;x + 5}
}#[tokio::main]
async fn main() {bar().await;
}

async 语句块和 async fn 最大的区别就是前者无法显式的声明返回值,
在大多数时候这都不是问题,但是当配合? 一起使用时,问题就有所不同:

async fn foo() -> Result<u8, String> {Ok(1)
}
async fn bar() -> Result<u8, String> {Ok(1)
}
pub fn main() {let fut = async {foo().await?;bar().await?;Ok(())};
}

以上代码编译后会报错:
在这里插入图片描述

原因在于编译器无法推断出 Result<T, E> 中的 E 的类型,而且编译器的提示consider giving fut a type 你也别傻乎乎的相信,然后尝试半天,最后无奈放弃:
目前还没有办法为 async 语句块指定返回类型。
既然编译器无法推断出类型,那咱就给它更多提示,可以使用 ::< … > 的方式来增加类型注释:

async fn foo() -> Result<u8, String> {Ok(1)
}
async fn bar() -> Result<u8, String> {Ok(1)
}
pub fn main() {let fut = async {foo().await?;bar().await?;Ok::<(), String>(()) //在这一行进行显式类型标注};
}

• await 关键字:在异步函数内部使用 await 关键字等待异步操作完成。
async/.await 是 Rust 语法的一部分,它在遇到阻塞操作时 ( 例如 IO ) 会让出当前线程的所有权而不是阻塞当前线程,这样就允许当前线程继续去执行其它代码,最终实现并发。
async 是懒惰的,直到被执行器 poll 或者 .await 后才会开始运行,其中后者是最常用的运行 Future 的方法。
当 .await 被调用时,它会尝试运行 Future 直到完成,但是若该 Future 进入阻塞,那就会让出当前线程的控制权。
当 Future 后面准备再一次被运行时 (例如从 socket 中读取到了数据),执行器会得到通知,并再次运行该 Future ,如此循环,直到完成。
• Future Trait:表示异步任务的 Future Trait,提供异步任务的执行和状态管理。

pub trait Future {type Output;// Required methodfn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

二、async/await 语法和用法

async 和 await 是 Rust 中用于异步编程的关键字。
async 用于定义异步函数,表示函数体中包含异步代码。
await 用于等待异步操作完成,并返回异步操作的结果。

• 异步函数使用 async 关键字定义,并返回实现了 Future Trait 的类型。
异步函数可以在其他异步函数中使用 await 关键字等待异步操作完成。
调用异步函数时,会返回一个实现了 Future Trait 的对象,可以通过调用.await 方法等待结果。

• 异步块是一种在异步函数内部创建的临时异步上下文,可以使用 async 关键字创建。
异步闭包是一种将异步代码封装在闭包中的方式,可以使用 async 关键字创建。
异步块和异步闭包允许在同步上下文中使用 await 关键字等待异步操作。

异步函数的返回类型通常是实现了 Future Trait 的类型。Future Trait 表示一个异步任务,提供异步任务的执行和状态管理。
Rust 标准库和第三方库中提供了许多实现了Future Trait 的类型,用于表示各种异步操作。

举一个例子,下面这个例子是一个传统的并发下载网页的例子:

fn get_two_sites() {// 创建两个线程执行任务let thread_one = thread::spawn(|| download("https://course.rs"));let thread_two = thread::spawn(|| download("https://fancy.rs"));// 等待两个线程完成thread_one.join().expect("thread one panicked");thread_two.join().expect("thread two panicked");
}

如果是在一个小项目中简单的去下载文件,这么写没有任何问题,但是一旦下载文件的并发请求多起来,那一个下载任务占用一个线程的模式就太重了,
会很容易成为程序的瓶颈。好在,我们可以使用 async 的方式来解决:

async fn get_two_sites_async() {// 创建两个不同的Future,你可以把future理解为未来某个时刻会被执行的任务// 当两个future被同时执行后,它们将并发地去下载目标页面let future_one = download_async("https://www.foo.com");let future_two = download_async("https://www.bar.com");// 同事运行两个future,直至完成join!(future_one, future_two);
}

注意上面的代码必须在一个异步运行时在运行,以便异步运行时使用一定数量的线程来调度这些代码的运行。
接下来我们就学习各种异步运行时库和异步运行时的方法。

三、Tokio运行时

Tokio 是 Rust 异步编程最重要的运行时库, 提供了异步 IO、异步任务调度、同步原语等功能。
Tokio 的主要组件包括:
• tokio - 核心运行时, 提供任务调度,IO 资源等。
• tokio::net - 异步 TCP、UDP 的实现。
• tokio::sync - 互斥量、信号量等并发原语。
• tokio::time - 时间相关工具。
• tokio::fs - 异步文件 IO。

Tokio 是一个基于事件驱动、非阻塞 I/O 的平台,用于使用 Rust 编写异步应用程序。在
高层次上,它提供了一些主要组件:
• 用于处理异步任务的工具,包括同步原语、通道、超时、睡眠和间隔。
• 执行异步 I/O 操作的 API,包括 TCP 和 UDP sockets、文件系统操作以及进程和
信号管理。
• 用于执行异步代码的运行时,包括任务调度器、基于操作系统事件队列的 I/O 驱
动程序(epoll、kqueue、IOCP 等),以及高性能计时器。
本文不介绍 tokio 的异步 I/O、网络编程等功能,而是重点介绍 tokio 的运行时,异步任务调度器和同步原语。

3.1 异步运行时

你可以如下定义 main 函数,它自动支持运行时的启动:

#[tokio::main]
async fn main() {// 运行时中异步执行任务tokio::spawn(async {// do work});// 等待任务完成other_task.await;
}

这个例子 main 函数前必须加 async 关键字,并且加 #[tokio::main] 属性,那么这个 main 就会在异步运行时运行。
你也可以使用显示创建运行时的方法:

pub fn tokio_async() {let rt = tokio::runtime::Runtime::new().unwrap();rt.block_on(async {println!("Hello from tokio!");rt.spawn(async {println!("Hello from a tokio task!");println!("in spawn")}).await.unwrap();});rt.spawn_blocking(|| println!("in spawn_blocking"));
}

首先它创建了一个 Tokio 运行时 rt。block_on 方法在运行时上下文中执行一个异步
任务, 这里我们简单地打印了一句话。
然后使用 rt.spawn 在运行时中异步执行另一个任务。这个任务也打印了几句话。
spawn返回一个 JoinHandle, 所以这里调用.await 来等待任务结束。
最后, 使用 spawn_blocking 在运行时中执行一个普通的阻塞任务。
这个任务会在线程池中运行, 而不会阻塞运行时。
总结一下这个例子展示的要点:
• 在 Tokio 运行时中用 block_on 执行异步任务
• 用 spawn 在运行时中异步执行任务
• 用 spawn_blocking 在线程池中执行阻塞任务
• 可以 awaitJoinHandle 来等待异步任务结束
Tokio 运行时提供了执行和调度异步任务所需的全部功能。通过正确地组合 block_on、spawn 和 spawn_blocking, 可以发挥 Tokio 的强大能力, 实现各种异步场景。

3.2 同步原语

3.2.1 Mutex

这种类型的行为类似于 std::sync::Mutex,但有两个主要区别:lock 是一个异步方法,因此不会阻塞,并且锁守卫(lock guard)设计用于跨越 .await 点而持有。
与流行的观念相反,在异步代码中使用标准库中的普通 Mutex 是可以的,而且通常更可取。
异步互斥锁相对于阻塞互斥锁提供的特性是能够在.await 点上保持锁定。
这使得异步互斥锁比阻塞互斥锁更昂贵,因此在可以使用阻塞互斥锁的情况下应首选阻塞互斥锁。
异步互斥锁的主要用例是提供对诸如数据库连接等 IO 资源的共享可变访问。
如果互斥锁后面的值只是数据,通常可以使用标准库或 parking_lot 中的阻塞互斥锁。
请注意,尽管在任务不能在线程之间移动的情况下,编译器不会阻止 std::Mutex 在.await点上保持其卫兵,
但在实践中,这几乎永远不会导致正确的并发代码,因为它很容易导致死锁。
一个常见的模式是将 Arc<Mutex<. . . » 包装在一个结构体中,为其中的数据提供非异步方法,仅在这些方法内部锁定互斥锁。
mini-redis 示例提供了这种模式的示例。
此外,当确实需要对 IO 资源进行共享访问时,通常最好生成一个任务来管理该 IO 资源,并使用消息传递与该任务进行通信。

下面是一个 Mutex 的例子:

use tokio::sync::Mutex;
use std::sync::Arc;#[tokio::main]
async fn main() {// 用 Arc 包裹以实现多任务共享//初始值为0let data = Arc::new(Mutex::new(0));// 克隆 Arc 以供多个任务使用let data1 = Arc::clone(&data);let data2 = Arc::clone(&data);// 创建两个任务//第一个任务加1let handle1 = tokio::spawn(async move {let mut lock = data1.lock().await;*lock += 1;println!("Task 1: {}", *lock);});//第二个任务加2let handle2 = tokio::spawn(async move {let mut lock = data2.lock().await;*lock += 2;println!("Task 2: {}", *lock);});// 等待任务完成handle1.await.unwrap();handle2.await.unwrap();// 输出最终结果为3println!("Final: {}", *data.lock().await);
}

在这里插入图片描述

代码分析
这里我们创建了一个初始值为 0 的共享变量,并用 Arc<Mutex<i32>> 包裹起来以实现线程安全的并发共享。
🔁 两个任务执行:
这两个任务使用 tokio::spawn 并发执行,但 .lock().await 是 异步互斥的,一次只能有一个任务进入临界区。
因此:
task1 获取锁后,把值从 0 加到 1。
task2 等待锁,获取后再从 1 加到 3。
主线程等待两个任务都完成后,打印最终值为 3。

📌 注意事项
必须 .await .lock():
let mut guard = mutex.lock().await;
这是一个异步操作,会在互斥锁被释放前挂起当前任务。
避免持有锁太久:
长时间持有锁可能会阻塞其他任务进入该临界区。尽量缩小 .lock().await 的作用域。

与 Arc 结合:
如果多个任务共享数据,需要通过 Arc<Mutex> 来实现线程安全共享。

⛔ 常见误区

  1. 在 lock().await 后 panic 了
    let guard = data.lock().await;
    panic!(“oops”); // 这里会导致锁被 drop,其他任务可继续访问

虽然 tokio::sync::Mutex 是可恢复的(不是毒锁),但仍建议避免在持锁状态下 panic。

3.2.2 RwLock

这种类型的锁允许同时存在多个读取者或最多一个写入者。
这种锁的写入部分通常允许修改底层数据(独占访问),而读取部分通常允许只读访问(共享访问)。
相比之下,互斥锁不区分获取锁的读取者或写入者,因此导致等待锁的任何任务都必须放弃。
RwLock 将允许任意数量的读取者获取锁,只要没有写入者持有锁。
Tokio 的读写锁的优先级策略是公平的(或偏向写入),以确保读取者不能使写入者饥饿。
通过使用一个先进先出队列来确保公平性,等待锁的任务不会分发读取锁,直到写锁被释放。
这与 Rust 标准库的 std::sync::RwLock 不同,后者的优先级策略取决于操作系统的实现。
类型参数 T 表示此锁所保护的数据。要在线程之间共享,要求 T 满足 Send。
从锁定方法返回的 RAII 卫兵实现了 Deref(对于写入方法还实现了 DerefMut),以允许访问锁的内容。

okio::sync::RwLock 提供了一种异步读写锁机制,允许多个任务并发读取或独占写入某个共享值。它的行为类似于标准库中的 std::sync::RwLock,但支持异步等待。

RwLock(读写锁)允许:
多个读者(readers)同时持有读锁(read().await)
但只有一个写者(writer)能持有写锁(write().await),且在写锁持有期间不能有其他读者或写者。

使用示例
示例 1:读写锁的基本用法

use tokio::sync::RwLock;
use std::sync::Arc;#[tokio::main]
async fn main() {let data = Arc::new(RwLock::new(5));// 读锁{let r1 = data.read().await;let r2 = data.read().await;println!("两个读者同时读:{} 和 {}", *r1, *r2);}// 写锁{let mut w = data.write().await;*w += 1;println!("写者将值加一:{}", *w);}// 再次读{let r = data.read().await;println!("新的值是:{}", *r);}
}

在这里插入图片描述

与 Arc 搭配共享数据
由于 Tokio 的 RwLock 不是 Copy 或 Clone,所以通常我们使用 Arc<RwLock> 来在多个任务之间共享它。
示例 2:多任务共享写入

use tokio::sync::RwLock;
use std::sync::Arc;#[tokio::main]
async fn main() {// 创建一个可读写的锁let counter = Arc::new(RwLock::new(0));// 创建一个任务句柄的向量let mut handles = vec![];// 创建10个任务for _ in 0..10 {let counter_clone = counter.clone();// 创建多个任务,每个任务都尝试增加计数器的值,每次增加1let handle = tokio::spawn(async move {let mut w = counter_clone.write().await;*w += 1;});handles.push(handle);}// 等待所有任务完成for handle in handles {handle.await.unwrap();}let result = counter.read().await;println!("最终计数器值:{}", *result); // 应该是 10
}

在这里插入图片描述

注意事项
✅ 使用 .await
所有的读写操作必须使用 .await,因为它们可能会暂停当前任务直到锁可用。

⚠️ 避免死锁
不要在持有锁期间执行可能阻塞很久的 .await 操作(比如网络请求),否则会导致死锁或性能下降。

错误示例(容易死锁):

let mut w = lock.write().await;
// ❌ 锁未释放,下面的 await 操作可能会阻塞其他等待这个锁的任务
some_async_function().await;

正确写法:

let result = {let mut w = lock.write().await;// 在这里先修改值*w += 1;*w // 或 clone 出值
}; // 提前释放锁
some_async_function().await;

替代选择
如果你不需要异步上下文,可以用 std::sync::RwLock。
如果数据结构很小或访问冲突很少,可以用 tokio::sync::Mutex 代替 RwLock,实现更简单。

3.2.3 Barrier

tokio::sync::Barrier 是一个异步屏障同步原语,适用于多个异步任务(或线程)在某个同步点会合后再一起继续执行的场景。
它的作用和标准库中的 std::sync::Barrier 类似,但是异步版本,可以和 Tokio 运行时协同工作。
用途概述
Barrier 会阻塞调用 .wait().await 的任务,直到有 足够数量 的任务到达,然后这些任务会一起继续执行。
比如,创建一个 Barrier,要求 3 个任务会合:
let barrier = Arc::new(Barrier::new(3));

然后 3 个任务都调用 .wait().await,当第 3 个任务也调用 .wait() 时,这三个任务才会继续往下执行。

基本用法示例

use std::sync::Arc;
use tokio::sync::Barrier;
use tokio::task;#[tokio::main]
async fn main() {let barrier = Arc::new(Barrier::new(3)); // 需要3个任务同步for i in 0..3 {let b = barrier.clone();task::spawn(async move {println!("任务 {} 到达屏障", i);let wait_result = b.wait().await; // 等待其他任务到达屏障if wait_result.is_leader() {println!("任务 {} 是 leader", i);}println!("任务 {} 继续执行", i);});}// 给任务时间执行完tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}

在这里插入图片描述

Barrier::wait() 返回值详解

let wait_result = barrier.wait().await;
if wait_result.is_leader() {// 只有一个任务的 is_leader() 为 true
}

.wait() 会返回一个 BarrierWaitResult
该结果的 .is_leader() 为 true 的任务表示它是这批任务中的“leader”,可以用来做一些一次性的初始化或工作。

常见应用场景

  1. 同时启动多个任务
    比如,多个客户端准备好后一起发起请求:
let barrier = Arc::new(Barrier::new(n_clients + 1)); // 多加1是因为主任务也要 wait
for _ in 0..n_clients {let b = barrier.clone();tokio::spawn(async move {b.wait().await;// 所有任务同时开始do_some_work().await;});
}barrier.wait().await; // 主任务等所有子任务都准备好

注意事项
Barrier 是可重用的:多轮同步是可以的,只要参与者数量一致。
使用时需要 Arc 包装共享。
不会自动取消等待任务,若其中一个任务挂了,其他任务可能会永远等待。

完整例子(多个任务并发模拟)

use tokio::sync::Barrier;
use std::sync::Arc;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let barrier = Arc::new(Barrier::new(5));for i in 0..5 {let b = barrier.clone();tokio::spawn(async move {println!("任务 {} 初始化完成", i);sleep(Duration::from_millis(100 * i)).await;println!("任务 {} 等待屏障", i);let wait = b.wait().await;if wait.is_leader() {println!("任务 {} 是 leader", i);}println!("任务 {} 开始执行主逻辑", i);});}sleep(Duration::from_secs(2)).await;
}

在这里插入图片描述

3.2.4 Notify

tokio::sync::Notify 是 Tokio 提供的一种轻量级通知机制,适用于多个异步任务之间的单向唤醒。它的使用场景类似于 Condvar,但它是为 async 设计的。
✅ Notify 适合的场景
一个任务等待某个条件满足(比如数据准备好)
另一个任务通知它继续执行
可以多个等待者,但唤醒行为为 一次唤醒一个,先来先唤醒

📦 引用方式

use tokio::sync::Notify;
use std::sync::Arc;

Notify 本身不是 Clone,但可以通过 Arc<Notify> 在多任务之间共享。

📘 示例 1:单个通知者,单个等待者

use tokio::sync::Notify;
use std::sync::Arc;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {// 创建通知对象let notify = Arc::new(Notify::new());let notify2 = notify.clone();// 等待者任务let waiter = tokio::spawn(async move {println!("等待通知...");notify2.notified().await; // 等待通知println!("收到通知,继续执行");});// 模拟处理耗时任务sleep(Duration::from_secs(2)).await;println!("发送通知");notify.notify_one(); // 发送通知waiter.await.unwrap(); // 等待任务完成
}

在这里插入图片描述

📘 示例 2:多个等待者

use tokio::sync::Notify;
use std::sync::Arc;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let notify = Arc::new(Notify::new());// 启动多个等待者for i in 0..3 {let notify_clone = notify.clone();tokio::spawn(async move {println!("任务 {} 正在等待...", i);notify_clone.notified().await;println!("任务 {} 被唤醒!", i);});}sleep(Duration::from_secs(1)).await;// 一次只能唤醒一个任务println!("第一次 notify_one");notify.notify_one();sleep(Duration::from_secs(1)).await;println!("第二次 notify_one");notify.notify_one();sleep(Duration::from_secs(1)).await;println!("第三次 notify_one");notify.notify_one();sleep(Duration::from_secs(1)).await;
}

在这里插入图片描述

📘 示例 3:notify_waiters() 唤醒所有等待者

notify.notify_waiters(); // 所有在等待 .notified().await 的任务都会被唤醒

❗注意事项
.notified().await 是挂起等待直到 notify_one() 或 notify_waiters() 被调用。
如果 notify_one() 在调用 notified().await 之前发生,下一次 .notified().await 会立即完成(类似一个“令牌”机制)。
.notified() 不会排队等待多次 notify,因此 notify 过后必须立刻进入 .await,不然容易漏掉通知。

3.2.5 Semaphore

tokio::sync::Semaphore 是 Tokio 提供的一个异步信号量(计数器),用于限制并发访问某个资源的数量,通常用于限流控制、连接数控制等场景。

✅ 基本概念
信号量维护一个许可(permit)计数,调用 .acquire().await 会尝试获取一个许可,如果没有许可可用,则该任务会被挂起直到有许可释放。

📌 示例一:限制并发数量
我们模拟一个场景:最多允许 3 个任务同时访问共享资源。

use tokio::sync::Semaphore;
use std::sync::Arc;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let semaphore = Arc::new(Semaphore::new(3)); // 最多3个并发let mut handles = vec![];for i in 0..10 {let sem_clone = semaphore.clone();let handle = tokio::spawn(async move {// 获取一个 permitlet permit = sem_clone.acquire().await.unwrap();println!("任务 {} 开始", i);sleep(Duration::from_secs(2)).await;println!("任务 {} 结束", i);drop(permit); // 显式释放许可(也可以自动 drop)});handles.push(handle);}// 等待所有任务完成for handle in handles {handle.await.unwrap();}
}

每次只能并发3个任务
在这里插入图片描述

📌 示例二:try_acquire(非阻塞)

if let Ok(permit) = semaphore.try_acquire() {// 成功获取许可// 使用资源drop(permit);
} else {// 无法立即获取许可
}

📌 示例三:acquire_owned(获取一个可跨线程的许可)
let permit = semaphore.clone().acquire_owned().await.unwrap();
// permit 是 SemaphorePermitOwned 类型,可以跨线程使用
适用于你想把 permit 移动到别的线程或 async task 中。

🔁 示例四:一次获取多个许可

let permits = semaphore.acquire_many(2).await.unwrap();
// 获取两个许可
drop(permits);

如果你初始化 Semaphore::new(3),那 acquire_many(4) 将会等待直到足够许可释放。

⚠️ 注意事项
Semaphore 不会自动恢复许可数量,必须手动 drop(permit)。
若 .acquire() 的返回值没有被持有(或者提前被 drop),许可会立即释放。
Semaphore 本身是 Sync + Send,适合用于跨任务共享。

3.2.6 OnceCell

tokio::sync::OnceCell 是 Tokio 提供的一个异步版本的单次初始化容器,
类似于标准库中的 std::sync::OnceLock 或 once_cell::sync::OnceCell,但支持 异步初始化,即可以在异步上下文中安全地初始化一次,并在多个任务之间共享该值。

使用场景
适用于:
惰性初始化全局变量;
异步初始化依赖,如数据库连接、配置读取等;
多线程/多任务安全地共享只初始化一次的数据。

常用 API

use tokio::sync::OnceCell;impl<T> OnceCell<T> {pub const fn new() -> Selfpub async fn get_or_init<F, Fut>(&self, f: F) -> &TwhereF: FnOnce() -> Fut,Fut: Future<Output = T>pub fn get(&self) -> Option<&T>pub fn set(&self, value: T) -> Result<(), T>
}

示例
示例 1:异步惰性初始化全局变量(如配置)

use tokio::sync::OnceCell;
use std::time::Duration;//定义静态变量
static CONFIG: OnceCell<String> = OnceCell::const_new();async fn load_config() -> String {// 模拟异步加载tokio::time::sleep(Duration::from_secs(1)).await;"配置加载完成".to_string()
}async fn get_config() -> &'static String {CONFIG.get_or_init(load_config).await
}#[tokio::main]
async fn main() {let c1 = get_config().await;println!("第一次: {}", c1);let c2 = get_config().await;println!("第二次: {}", c2);
}

输出只会初始化一次(延迟 1 秒),之后立即返回结果。
在这里插入图片描述

示例 2:在结构体中使用

use tokio::sync::OnceCell;struct AppState {db: OnceCell<String>,
}impl AppState {fn new() -> Self {Self { db: OnceCell::new() }}// 获取数据库连接async fn get_db(&self) -> &String {self.db.get_or_init(|| async {// 模拟连接数据库tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;"数据库连接".to_string()}).await}
}#[tokio::main]
async fn main() {let app = AppState::new();println!("DB: {}", app.get_db().await);
}

在这里插入图片描述

示例 3:配合 set

use tokio::sync::OnceCell;#[tokio::main]
async fn main() {let cell = OnceCell::new();cell.set(123).unwrap();assert_eq!(cell.get(), Some(&123));// 第二次设置会失败assert!(cell.set(456).is_err());
}

注意:set() 是同步方法,不能用于异步初始化逻辑。如果初始化需要异步,建议使用 get_or_init()。

注意事项
OnceCell::get_or_init() 是 异步安全 的,多任务并发调用时只有一个初始化任务会被执行;
不要在初始化闭包中再次调用 get_or_init() 以防死锁;
它是 Send + Sync + 'static 的,只要内部类型满足这些要求。

3.3 通道Channel

在 Tokio 程序中,最常见的同步形式是消息传递。两个任务独立运行,并通过发送消息来进行同步。这样做的好处是避免了共享状态。
消息传递是通过通道实现的。通道支持从一个生产者任务发送消息到一个或多个消费者任务。
Tokio 提供了几种不同类型的通道。每种通道类型支持不同的消息传递模式。
当一个通道支持多个生产者时,许多独立的任务可以发送消息。当一个通道支持多个消费者时,许多不同的独立任务可以接收消息。
由于不同的消息传递模式最好使用不同的实现,Tokio 提供了许多不同类型的通道。

3.3.1 mpsc

mpsc 通道支持从多个生产者向单个消费者发送多个值。这种通道通常用于将工作发送到任务或接收多个计算的结果。
如果要从单个生产者向单个消费者发送多条消息,也应使用此通道。没有专用的 spsc通道。
tokio::sync::mpsc 是 Tokio 提供的 多生产者、单消费者 异步通道(multi-producer, single-consumer channel),常用于任务之间的异步通信。

🔧 基本用法

use tokio::sync::mpsc;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {// 创建一个通道,buffer 大小为 32let (tx, mut rx) = mpsc::channel(32);// 启动生产者任务tokio::spawn(async move {for i in 0..5 {if tx.send(i).await.is_err() {println!("接收者已关闭");return;}println!("发送: {}", i);sleep(Duration::from_millis(500)).await;}});// 消费者接收消息while let Some(value) = rx.recv().await {println!("接收: {}", value);}println!("通道已关闭");
}

mpsc::channel 的参数是通道的容量。这是在任何给定时间可以存储在通道中等待接收的值的最大数量。
正确设置此值对于实现健壮的程序至关重要,因为通道容量在处理背压方面起着关键作用。
在这里插入图片描述

🔁 多个发送者
可以通过 tx.clone() 创建多个发送者:

let tx1 = tx.clone();
tokio::spawn(async move {tx1.send("来自 tx1").await.unwrap();
});tokio::spawn(async move {tx.send("来自 tx").await.unwrap();
});

📌 特点
在这里插入图片描述

🔚 通道关闭行为
所有发送者被 drop 后,接收端 recv() 返回 None
接收端被 drop 后,发送者的 send().await 会返回错误

🧪 简单示例:检测接收端关闭

use tokio::sync::mpsc;#[tokio::main]
async fn main() {//创建一个单消费者通道let (tx, rx) = mpsc::channel::<i32>(1);drop(rx); // 关闭接收端// 尝试发送//当接收端关闭时,发送会返回错误match tx.send(123).await {Ok(_) => println!("发送成功"),Err(e) => println!("发送失败: {}", e), // 打印错误信息}
}

在这里插入图片描述

🧵 接收端只能在一个任务中用?
是的,Tokio 的 mpsc::Receiver 不是线程安全的(!Sync),不能在多个任务中并发调用 .recv().await,
你需要将所有逻辑集中在一个消费者任务中,或使用 tokio::sync::broadcast / tokio::sync::watch 等其它适合多消费者的方案。

3.3.2 oneshot

tokio::sync::oneshot 是 Tokio 提供的一种 单次发送单次接收 的异步通信方式,适用于两个异步任务之间的一次性结果传递,例如在一个任务完成后通知另一个任务结果。
单次通道支持从单个生产者向单个消费者发送单个值。通常,这种通道用于将计算的结果发送给等待者。
通过 oneshot::channel() 创建:
let (tx, rx) = oneshot::channel();

tx: 发送端(Sender)
rx: 接收端(Receiver)
发送者可以发送一次消息(值),接收者可以接收一次消息(异步操作)。

✅ 用法示例
示例 1:基本用法

use tokio::sync::oneshot;#[tokio::main]
async fn main() {// 创建一个单次发送通道let (tx, rx) = oneshot::channel();// 启动一个异步任务,发送结果tokio::spawn(async move {// 模拟异步计算let result = 42;tx.send(result).unwrap(); // 发送结果});let value = rx.await.unwrap(); // 接收结果println!("Received: {}", value);
}

在这里插入图片描述

示例 2:错误处理

use tokio::sync::oneshot;#[tokio::main]
async fn main() {let (tx, rx) = oneshot::channel::<i32>();// 发送端提前被 drop 掉drop(tx);match rx.await {Ok(val) => println!("Received: {}", val),Err(_) => println!("发送端已被 drop, 未收到任何值"),}
}

在这里插入图片描述

📂 应用场景

  1. 任务之间结果通知
    适合某个异步任务计算完成后,通知主任务或其他任务处理结果。
async fn async_worker(tx: oneshot::Sender<String>) {// 做一些事...let result = "done".to_string();let _ = tx.send(result);
}
  1. 异步请求-响应模型
    可以配合通道 + ID 做异步请求响应配对。

📖 方法详解

Sender<T>
send(val: T) -> Result<(), T>
发送值。只能调用一次,失败会返回原值(接收端被 drop 时)。let _ = sender.send(100)?;Receiver<T>: Future<Output = Result<T, Canceled>>
await 获取发送的值
Err(Canceled) 表示发送端已被 drop
match receiver.await {Ok(val) => { /* 使用 val */ }Err(_) => { /* 发送端已消失 */ }
}

❗ 注意事项
在这里插入图片描述

🧪 示例:结合超时使用

use tokio::{ sync::oneshot, time::{ sleep, Duration } };#[tokio::main]
async fn main() {let (tx, rx) = oneshot::channel();tokio::spawn(async move {sleep(Duration::from_secs(2)).await;let _ = tx.send("hello");});tokio::select! {res = rx => println!("收到: {:?}", res),_ = sleep(Duration::from_secs(1)) => println!("超时未收到消息"),}
}

在这里插入图片描述

3.3.4 broadcast

tokio::sync::broadcast 是 Tokio 提供的一种 多生产者、多消费者的广播通道(broadcast channel),允许一个消息被发送后,被所有活跃的接收者(subscriber)接收。
适用场景:一个发送者向多个接收者广播消息(如:通知系统、事件驱动框架中的事件广播、关闭通知等)。
🧱 基本结构

let (tx, rx) = broadcast::channel::<T>(buffer_size);tx: 广播通道的发送端 (Sender<T>)
rx: 广播通道的接收端 (Receiver<T>)
buffer_size: 环形缓冲区的大小,保存最近的 N 条消息

特点:
所有接收者都共享同一个缓冲区副本,每个 receiver 有独立的“指针”。
如果发送速度过快、receiver 没有及时接收,会造成 receiver 报错 Lagged(n),意味着丢了 n 条消息。

✍ 示例代码(单发送者多接收者)

use tokio::sync::broadcast;
use tokio::task;#[tokio::main]
async fn main() {let (tx, _) = broadcast::channel::<String>(16); // 创建一个容量为16的通道// 创建多个接收者for i in 0..3 {let mut rx = tx.subscribe(); // 每个接收者都调用 subscribe()task::spawn(async move {while let Ok(msg) = rx.recv().await {println!("Receiver {} got: {}", i, msg);}});}// 发送者发送多条消息tx.send("hello".to_string()).unwrap();tx.send("world".to_string()).unwrap();// 等待一段时间,让接收者有足够的时间接收消息tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}

在这里插入图片描述

🧠 方法详解

  1. broadcast::channel<T>(usize) -> (Sender<T>, Receiver<T>)
    创建一个新的广播通道,传入环形缓冲区大小。

  2. Sender<T>::send(value: T) -> Result<usize, SendError<T>>
    广播一条消息,返回成功接收到的订阅者数量。
    如果没有 receiver 存活,也会成功发送(但没人接收)

  3. Sender<T>::subscribe() -> Receiver<T>
    创建一个新的接收者。新接收者可以从当前时刻起接收新的消息。

  4. Receiver<T>::recv().await -> Result<T, RecvError>
    异步接收广播消息:
    返回 Ok(msg):成功接收到消息
    返回 Err(Lagged(n)):丢失了 n 条消息(可能发送过快,buffer 被覆盖)
    返回 Err(Closed):所有发送者已被 drop,通道关闭

  5. Receiver<T>::try_recv() -> Result<T, TryRecvError>
    非阻塞接收消息。

  6. Receiver<T>::recv_ref().await -> Result<&T, RecvError>
    返回引用而不是移动所有权(需要 'static 生命周期或更复杂的要求,使用较少)。

🔁 消息丢失与 Lagged 错误
广播通道是有限大小的环形缓冲区。如果发送速度过快、接收者来不及处理,接收者将收到如下错误:
Err(broadcast::error::RecvError::Lagged(n))
表示 receiver 的“光标”已经滞后 n 条消息。

🧵 多发送者(clone)
可以通过 .clone() 创建多个发送者:

let tx2 = tx.clone();
tx2.send("message".to_string()).unwrap();

🛑 关闭通道(drop 所有 sender)
当所有 Sender 被 drop 后,所有 Receiver.recv().await 会返回 Err(RecvError::Closed)。

📌 应用场景示例
✅ 关闭信号广播

use tokio::sync::broadcast;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let (shutdown_tx, _) = broadcast::channel::<()>(1);let mut handles = Vec::new();for i in 0..3 {let mut shutdown_rx = shutdown_tx.subscribe();let handle = tokio::spawn(async move {loop {tokio::select! {// 接收关闭信号//上面用了死循环,只有这里接收到关闭信号才会退出_ = shutdown_rx.recv() => {println!("Worker {} received shutdown", i);break;}_ = sleep(Duration::from_secs(1)) => {println!("Worker {} working...", i);}}}});handles.push(handle);}sleep(Duration::from_secs(3)).await;// 发送关闭信号shutdown_tx.send(()).unwrap();// 等待所有任务退出for handle in handles {let _ = handle.await;}
}

在这里插入图片描述

❗ 注意事项
通道容量太小,容易 Lagged。
每个接收者只能接收从订阅开始之后的消息。
不适用于顺序处理、确认投递的场景(用 mpsc 更合适)。

3.3.5 watch (spmc)

tokio::sync::watch 是 Tokio 提供的一种用于 单生产者-多消费者 的异步广播通道。
它的特点是只保留 最新的一条消息(值),适合传递“配置变更”、“状态通知”等最新状态的场景。

观察通道支持从单个生产者向多个消费者发送多个值。然而,通道中仅存储最新的值。
当发送新值时,会通知消费者,但不能保证消费者会看到所有的值。
观察通道类似于容量为 1 的广播通道。
观察通道的用例包括广播配置更改或发出程序状态更改的信号,例如切换到关闭状态。
下面这个例子使用观察通道通知任务配置更改。在此示例中,定期检查配置文件。当文件更改时,将向消费者发出配置更改的信号。

基本特性
单发送者,多接收者
每次只保留最后一个值
所有接收者都可以收到最新的值
接收者在调用 .changed().await 后可以用 .borrow() 获取值
如果发送者被 drop,.changed().await 会返回 Err

使用场景
配置热更新(config reloading)
状态传播(如系统状态 Ready/NotReady)
控制标志(如 shutdown、pause/resume)

API 结构

use tokio::sync::watch;
let (tx, rx) = watch::channel(initial_value);tx: 类型为 watch::Sender<T>
rx: 类型为 watch::Receiver<T>

示例代码详解
示例 1:基本用法(发送者更新状态,接收者监听变化)

use tokio::sync::watch;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let (tx, mut rx) = watch::channel("initial");// 启动一个异步任务,监听值的变化tokio::spawn(async move {loop {if rx.changed().await.is_err() {println!("发送端已关闭");break;}println!("收到新值: {}", *rx.borrow());}});sleep(Duration::from_secs(1)).await;tx.send("updated-1").unwrap();sleep(Duration::from_secs(1)).await;tx.send("updated-2").unwrap();// 发送端 drop 后,接收端将收到错误drop(tx);sleep(Duration::from_secs(1)).await;
}

多次发送,接收的是最新值
在这里插入图片描述

示例 2:多接收者共享最新值

use tokio::sync::watch;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let (tx, rx) = watch::channel(0);for i in 0..3 {let mut rx_clone = rx.clone();tokio::spawn(async move {while let Ok(_) = rx_clone.changed().await {println!("接收者 {} 收到 {}", i, *rx_clone.borrow());}});}for n in 1..=5 {tx.send(n).unwrap();sleep(Duration::from_millis(500)).await;}
}

在这里插入图片描述

示例 3:用于 shutdown 通知

use tokio::sync::watch;
use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {let (shutdown_tx, mut shutdown_rx) = watch::channel(false);let worker = tokio::spawn(async move {loop {tokio::select! {_ = shutdown_rx.changed() => {if *shutdown_rx.borrow() {println!("收到关闭信号, worker退出");break;}}}}});//等待2秒后发送关闭信号sleep(Duration::from_secs(2)).await;shutdown_tx.send(true).unwrap();worker.await.unwrap();
}

在这里插入图片描述

主要方法说明
在这里插入图片描述

在这里插入图片描述

注意事项
值必须实现 Clone + Send + Sync + 'static
.changed().await 必须调用才能看到新值,否则 .borrow() 一直是旧的。
watch 是最适合 状态广播 的场景,不适合数据队列用途(用 mpsc 或 broadcast)。
.send() 不阻塞,永远成功,只是替换旧值。

与 broadcast 的区别
在这里插入图片描述

3.4 tokio时间相关

用于跟踪时间的实用工具。
该模块提供了一些类型,用于在一段时间后执行代码。
• Sleep: 是一个在特定瞬间完成 future,不执行任何工作。
• Interval: 是一个以固定周期生成值的流。它以 Duration 初始化,并在每次经过该持续时间后重复生成。
• Timeout: 包装一个 future 或流,设置其允许执行的时间上限。如果 future 或流未能在规定时间内完成,则它将被取消,并返回错误。
这些类型足以处理许多涉及时间的场景。
这些类型必须在 Tokio 的 Runtime 的上下文中使用。

3.4.1 Sleep和Sleep_until

在 Rust 的异步运行时 Tokio 中,tokio::time::sleep 和 tokio::time::sleep_until 都用于异步地“睡眠”(延时),即让当前任务在指定时间之后再继续执行。
它们是构建异步定时器、延时处理等功能的基础工具。
等待直到持续时间已过。
相当于 sleep_until(Instant::now() + duration)。是 std::thread::sleep 的异步对应。
在等待 sleep future 完成时不执行任何工作。Sleep 以毫秒为粒度运行,不应用于需要高分辨率定时器的任务。
实现是平台特定的,一些平台(特别是 Windows)将提供比 1毫秒更大分辨率的定时器。

tokio::time::sleep
作用
让当前异步任务在指定的时间段后再继续执行。

函数签名
pub fn sleep(duration: Duration) -> Sleep

示例

use tokio::time::{sleep, Duration};#[tokio::main]
async fn main() {println!("开始等待");sleep(Duration::from_secs(3)).await;println!("3秒后执行");
}

特点
参数是一个 Duration 类型。
睡眠时间是从调用 sleep() 的那一刻开始算起的。

tokio::time::sleep_until
作用
让当前异步任务睡到一个具体的时间点(而不是等待一个时间段)。

函数签名
pub fn sleep_until(deadline: Instant) -> Sleep

示例

use tokio::time::{sleep_until, Instant, Duration};#[tokio::main]
async fn main() {let deadline = Instant::now() + Duration::from_secs(5);println!("现在时间: {:?}", Instant::now());sleep_until(deadline).await;println!("5秒后的时间点到了: {:?}", Instant::now());
}

特点
参数是 Instant,表示一个具体的时刻。
如果目标时间早于当前时间,则会立即唤醒。

sleep vs sleep_until 区别总结
在这里插入图片描述

配合 tokio::time::interval 使用场景对比
如果你要每隔 5 秒执行一次任务,建议配合 tokio::time::interval;
如果你只要在未来某个时间点执行一次,用 sleep_until;
如果你要延迟执行某段逻辑,用 sleep 更直观。

取消与提前唤醒(高级)
两者返回的都是一个 Sleep 类型,它实现了 Future,你也可以用 tokio::select! 来取消等待:

use tokio::time::{sleep, Duration};#[tokio::main]
async fn main() {tokio::select! {_ = sleep(Duration::from_secs(10)) => {println!("睡眠完成");}_ = async {// 模拟外部取消信号sleep(Duration::from_secs(3)).await;println!("提前取消了任务");} => {}}
}

注意事项
sleep() 不会阻塞线程,而是让出执行权。
Instant::now() 与 std::time::Instant::now() 不是同一个,Tokio 的是可暂停时间(适用于测试)。
如果在 #[tokio::test] 中使用 sleep 进行异步延时测试,请注意使用 tokio::time::pause()、advance() 等控制时间(以避免测试太慢)。

tokio::select! 配合 sleep 实现并发等待
你可以用 tokio::select! 来同时监听多个 sleep() 或者 sleep_until(),从而实现:
多个任务中的最先触发
可取消 sleep 的逻辑

use tokio::time::{ sleep, Duration };#[tokio::main]
async fn main() {tokio::select! {_ = sleep(Duration::from_secs(3)) => {println!("任务1完成");}_ = sleep(Duration::from_secs(5)) => {println!("任务2完成");}}
}

在这里插入图片描述

使用 sleep_until 构建精确定时器
如果你要精确控制某个时间点触发(而不是每次偏移),sleep_until 是最佳选择。
示例:每秒执行一次,但不漂移

use tokio::time::{ Instant, sleep_until, Duration };#[tokio::main]
async fn main() {let mut now = Instant::now();for _ in 0..5 {now += Duration::from_secs(1);sleep_until(now).await;println!("精确触发:{:?}", Instant::now());}
}

在这里插入图片描述

和 interval() 的最大不同是:你手动控制时间点,确保“钟点”一致,不因任务执行慢而漂移。

3.4.2 Timeout

timeout 用于给某个 Future 设置最大等待时间,如果超时未完成,将返回 Elapsed 错误。
示例:

use tokio::time::{timeout, sleep, Duration};#[tokio::main]
async fn main() {let result = timeout(Duration::from_secs(2), async {sleep(Duration::from_secs(5)).await;"任务完成"}).await;match result {Ok(msg) => println!("{}", msg),Err(_) => println!("任务超时"),}
}

在这里插入图片描述

3.4.3 Interval

用于每隔固定时间执行任务(适合心跳、周期性数据拉取等场景)
示例:

use tokio::time::{ interval, Duration };#[tokio::main]
async fn main() {// 创建一个间隔为 1 秒的计时器let mut interval = interval(Duration::from_secs(1));for _ in 0..5 {// 调用 tick 方法来等待下一个时间点interval.tick().await;println!("周期性任务: {:?}", tokio::time::Instant::now());}
}

在这里插入图片描述

四、futures库

Rust的异步模型由Future trait和async/await语法构建。Future代表一个还没完成的异步计算。核心理念:
异步操作不会阻塞线程,遇到无法继续时让出控制权。
编译器通过state machine优化,零cost abstraction。
生态里有tokio、async-std等流行的异步运行时,而futures库则是最基础的异步构建块。

4.1 futures库的历史和生态地位

futures库最早由社区推出,后与标准库/async await语法深度集成,是tokio、async-std等库的底层依赖。
futures 0.3 版本以后与async/await语法紧密结合,兼容性好。
标准库的Future trait基本和futures一致,futures库则包含了大量扩展工具、Stream等trait、组合子等。

4.2 Future Trait详解

4.2.1 Future的定义

标准库和futures库中的Future trait定义基本相同:

pub trait Future {type Output;fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

关键点解析:
type Output: Future最终完成后返回的数据类型。
poll方法: 轮询future是否完成。返回Poll::Pending表示还未完成,Poll::Ready(val)表示已完成并返回结果。
Pin<&mut Self>: 防止future在未完成时被移动,保证安全。
Context: 提供了唤醒器(waker),用于通知执行器“自己已准备好继续被poll”。

4.2.2 poll的状态机原理

每次poll时,future推进一步,有可能完成、可能还需等待。
执行器根据waker回调判断何时重新poll。
async/await生成的future内部实现了状态机。

4.3 Future的状态机原理

编译器会将async块/函数转换成状态机。比如:

async fn foo() -> u32 {1 + bar().await
}

编译后等价于:

enum FooState {Start,WaitingForBar(Pin<Box<dyn Future<Output=u32>>>),Done,
}

每次poll,根据内部状态做切换。

4.4 Future的poll方法和Pin/Unpin

4.4.1 为什么需要Pin

Future通常捕获了引用或者局部变量,移动会导致悬垂指针。
Pin保证了future在内存地址上的“固定性”。

4.4.2 Unpin的意义

如果类型实现了Unpin trait,则可以安全地移动。
大多数简单Future是Unpin的,但包含自引用字段的类型通常不能Unpin。

4.5 futures库的核心Trait

4.5.1 Future Trait

与标准库的Future几乎一致,但futures库提供了更多组合工具和扩展方法。

4.5.2 Stream Trait
pub trait Stream {type Item;fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

异步产生一系列值(如异步迭代器)。
返回Option<Item>,None代表流结束。

4.5.3 Sink Trait

表示“异步可写入的对象”(如异步写管道)。

4.5.4 AsyncRead/AsyncWrite Trait

标准库Read/Write的异步版,类似于tokio::io。

4.6 Future的创建

4.6.1 async/await生成Future

最常见用法:

async fn fetch() -> String {// 省略异步IO"hello".to_string()
}let f = fetch();

这里f即为Future,实现了Future trait。

4.6.2 手动实现Future
use std::pin::Pin;
use std::task::{Context, Poll};
use std::future::Future;struct MyFuture;impl Future for MyFuture {type Output = u32;fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<u32> {Poll::Ready(42)}
}
4.6.3 futures::future模块

提供了大量辅助函数,如ready、pending、join、select等:

use futures::future::{ready, pending, join};let fut1 = ready(1);
let fut2 = ready(2);let fut = join(fut1, fut2); // 并行等待多个future

4.7 Future的组合子(combinator)详解

组合子(combinator)是函数式编程中的重要概念,可以对Future进行链式组合。

4.7.1 then、map、and_then
use futures::future::FutureExt;let fut = async { 2u32 };
let fut2 = fut.map(|x| x + 3);

map:Future完成后对结果进行变换。
then:Future完成后再执行另一个异步操作。
and_then:只在Ok时继续下一个异步操作(通常和Result结合)。

4.7.2 join, join_all, try_join

join!宏 / futures::future::join 并发等待多个future。
join_all 等待所有future完成,返回Vec结果。
try_join:遇到Err立即返回。

4.7.3 select

select!宏 / futures::future::select 谁先完成就返回谁。

use futures::future::{select, Either, ready};let fut1 = ready(1);
let fut2 = ready(2);let fut = select(fut1, fut2); // 谁先完成,谁的结果就先出来

4.8 Future的运行与执行器

Future只是一种描述,不会自动执行。必须由executor(执行器)驱动poll。

4.8.1 手动驱动Future
use futures::executor::block_on;let fut = async { 5 };
let val = block_on(fut);
println!("{}", val);
4.8.2 futures库的执行器

block_on:阻塞线程直到future完成。
LocalPool:本地任务池,可多次poll,适合单线程场景。
通常实际项目用tokio或async-std的执行器。

4.9 futures::Stream详解与常用操作

4.9.1 Stream的创建
use futures::stream::{self, StreamExt};let s = stream::iter(vec![1,2,3]); // 生成一个同步Stream

Tokio 提供了对 流(stream)处理的支持,主要通过与 futures::Stream trait 以及 tokio-stream 辅助库结合使用来实现异步流的处理。

Tokio中的流基础
Rust 中的异步流(Stream)类似于异步版的迭代器(Iterator)。它代表一系列异步产生的值。

use futures::stream::Stream;Stream trait 定义如下(简化):
trait Stream {type Item;fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Tokio 官方并未内置大量 stream 类型,而是推荐使用:
tokio-stream:提供 Stream 的适配器和扩展
futures:提供 Stream trait 以及 stream 工具

4.9.2 异步流的处理

StreamExt trait提供了许多常用方法:
next().await 异步获取下一个元素
for_each, map, filter, fold, collect

use futures::stream::StreamExt;let mut s = stream::iter(vec![1, 2, 3]);
while let Some(item) = s.next().await {println!("item = {}", item);
}
4.9.3 Stream与Future互转

stream.next() 返回的是Future<Option<Item>>
futures::stream::once 可将Future转为Stream

4.10 Stream与Sink的协作

Sink表示异步写入接口。

use futures::{SinkExt, stream};let (mut tx, mut rx) = futures::channel::mpsc::channel(10);
tx.send("hello").await.unwrap();while let Some(item) = rx.next().await {println!("{}", item);
}

常用场景:
websocket通信
网络协议的异步收发

4.11 通道(Channel)与异步消息通信

futures库提供多种channel:
futures::channel::mpsc:多生产者单消费者通道
futures::channel::oneshot:一次性单向信号
futures::channel::oneshot::channel

示例:

use futures::channel::oneshot;let (tx, rx) = oneshot::channel();
tokio::spawn(async move {tx.send(42).unwrap();
});
let res = rx.await.unwrap();
println!("res = {}", res);

4.12 Future与同步世界的桥梁

block_on:同步代码等待异步future完成
spawn_blocking:在异步世界中执行阻塞任务

use futures::executor::block_on;
let res = block_on(async {// ...异步代码123
});
println!("{}", res);

4.13 Future取消、超时与select

4.13.1 select选择

select! 宏可以等待多个future,只要其中一个完成就返回。

use futures::future::{self, select};
use futures::pin_mut;let fut1 = async { /* ... */ };
let fut2 = async { /* ... */ };
pin_mut!(fut1, fut2);
let res = select(fut1, fut2).await;
4.13.2 超时

futures库有futures::future::FutureExt::timeout方法:

use futures::FutureExt;
use std::time::Duration;let f = async {tokio::time::sleep(Duration::from_secs(3)).await;5
};
let res = f.timeout(Duration::from_secs(2)).await;
match res {Ok(val) => println!("val = {}", val),Err(_) => println!("timeout!"),
}
4.13.3 取消

Rust没有强制取消,但可以通过select和drop提前终止。

4.14 错误处理与Result、Option结合

许多异步操作返回Future<Output=Result<T, E>>
组合子如try_join、try_for_each遇到Err会直接返回。

use futures::future::try_join;let f1 = async { Ok::<_, std::io::Error>(1) };
let f2 = async { Ok::<_, std::io::Error>(2) };let res = try_join(f1, f2).await;
http://www.lryc.cn/news/602018.html

相关文章:

  • Java 排序
  • 股指期货周度想法
  • RWA 正当红,是 DeFi 的终点、拐点,还是新起点?
  • 【C++】手搓一个STL风格的vector容器
  • 7.DRF 过滤、排序、分页
  • 开发指南125-HTML DOM事件
  • 【Linux篇章】穿越数据迷雾:HTTPS构筑网络安全的量子级护盾,重塑数字信任帝国!
  • Kafka——请求是怎么被处理的?
  • 云原生MySQL Operator开发实战(三):高级特性与生产就绪功能
  • RabbitMQ+内网穿透远程访问教程:实现异地AMQP通信+Web管理
  • MongoDB索引及其原理
  • Java#包管理器来时的路
  • k8s的权限
  • Windows|CUDA和cuDNN下载和安装,默认安装在C盘和不安装在C盘的两种方法
  • C++ 中实现 `Task::WhenAll` 和 `Task::WhenAny` 的两种方案
  • Android启动时间优化大全
  • i节点学习
  • JavaScript核心概念全解析
  • Flutter中 Provider 的基础用法超详细讲解(二)之ChangeNotifierProvider
  • Vim 编辑器工作模式及操作指南
  • Spring AI 项目实战(二十一):Spring Boot + AI +DeepSeek驱动的智能题库系统(附完整源码)
  • zabbix-agent静默安装
  • @RefreshScope 核心原理深度解析:Spring Boot 的动态魔法
  • 抗辐照芯片在低轨卫星星座CAN总线通讯及供电系统的应用探讨
  • 第二阶段-第二章—8天Python从入门到精通【itheima】-138节(MySQL的综合案例)
  • 【程序员私房菜】python洋葱炒王中王火腿肠
  • 数据结构基础内容(第二篇:线性结构)
  • 【LeetCode刷题指南】--设计循环队列
  • 自由学习记录(74)
  • 【LeetCode 热题 100】51. N 皇后——回溯