当前位置: 首页 > news >正文

Rust 并行库 crossbeam 的 Channel 示例

示例1

一个不完整的示例:

let (tx, rx) = channel::unbounded::<Task>();
let mut handlers = vec![];for _ in 0..number {let rx = rx.clone();let handle = thread::spawn(move || {while let Some(task) = rx.recv() {task.call_box();}});handlers.push(handle);
}

该例子中,rx 可以被多个线程使用,是线程安全的。这就是所谓的 MPMC 模式。设想 channel 中有 10 个数据,MPMC 模式允许10个线程同时利用 rx 从 channel 中读取数据。

Rust 自带的 channel 是 MPSC 模式的,一次仅允许一个线程从 channel 读取数据。显然 crossbeam 效率更高。

示例2

use crossbeam::channel;
use crossbeam::thread;
use std::thread::sleep;
use std::time::Duration;// 定义Task结构体
struct Task {data: usize, // 假设任务包含一个数据字段call_box: Box<dyn FnMut()>, // 假设任务包含一个可调用对象的装箱指针
}impl Task {fn new(data: usize, call_box: impl FnMut() + 'static) -> Self {Task {data,call_box: Box::new(call_box),}}// 实现call_box方法fn call_box(&mut self) {(self.call_box)();}
}fn main() {const NUMBER_OF_WORKERS: usize = 4; // 假设有4个工作线程let (tx, rx) = channel::unbounded::<Task>();let mut handlers = vec![];// 启动工作线程for _ in 0..NUMBER_OF_WORKERS {let rx = rx.clone();let handle = thread::spawn(move || {while let Some(task) = rx.recv() {task.call_box(); // 执行任务}});handlers.push(handle);}// 发送任务到通道for i in 0..10 { // 假设发送10个任务let task = Task::new(i, || {println!("Executing task with data: {}", i);sleep(Duration::from_secs(1)); // 模拟耗时操作println!("Finished task with data: {}", i);});tx.send(task).unwrap();}// 关闭发送通道drop(tx);// 等待所有工作线程完成for handle in handlers {handle.join().unwrap();}println!("All tasks are processed.");
}

在这个程序中,我们定义了一个Task结构体,它包含一个data字段和一个call_box字段,后者是一个装箱的可调用对象。我们实现了call_box方法,它调用这个装箱的可调用对象。

main函数中,我们创建了一个无界通道,用于在工作线程和主线程之间传递Task实例。我们启动了NUMBER_OF_WORKERS个工作线程,它们不断地从通道接收Task实例并调用call_box方法执行它们。

然后,主线程创建了一些Task实例,并通过通道发送它们给工作线程。一旦所有任务都被发送,主线程通过drop(tx)关闭了发送通道,这样工作线程在尝试接收任务时,如果没有更多任务可用,将会得到一个None,从而退出循环。

最后,主线程等待所有工作线程完成,并打印出消息表示所有任务都已经处理完毕。

请注意,为了简化示例,我使用了Box<dyn FnMut()>来允许Task存储任何可调用对象的装箱指针。这意味着任务中的可调用对象必须能够单独编译成一个独立的、无状态的函数,这样才能安全地在多个线程之间共享。在实际应用中,你可能需要根据你的具体需求调整Task结构体的设计和使用方式。

http://www.lryc.cn/news/321023.html

相关文章:

  • 缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级的理解
  • springcloud gateway
  • JAVA八股day1
  • 探索拓展坞的奥秘:提升电脑接口的无限可能
  • Linux中执行脚本报错(脚本乱码问题)
  • el-table按钮获取当前行元素
  • MySQL数据导入的方式介绍
  • 构建部署_Docker常用命令
  • Spring Boot Actuator介绍
  • 数据库中DQL、DML、DDL、DCL的概念与区别
  • MacOS---设置Java环境变量
  • 使用 Boot Camp 助理查明您的 Mac 需不需要 Windows 安装介质
  • KY105 整除问题(用Java实现)
  • C++ 接口的实现,及作用通俗理解方式
  • TypeScript:typescript的安装与运行
  • 【代码随想录Day27】
  • 【一】【单片机】有关LED的实验
  • 面试算法-49-缺失的第一个正数
  • 论文笔记:液体管道泄漏综合检测与定位模型
  • 抖音视频批量提取软件|无水印视频下载
  • Linux docker1--环境及docker安装
  • uniapp使用uview - DatetimePicker 时间选择器 /时间戳转化
  • python实现websocket
  • ElasticSearch简介及常见用法
  • js iframe获取documen中的对象为空问题
  • vue3子父组件之间的调用
  • 用 二层口 实现三层口 IP 通信的一个实现方法
  • (学习日记)2024.03.12:UCOSIII第十四节:时基列表
  • 四.流程控制(顺序,分支,循环,嵌套)
  • 了解常用开发模型 -- 瀑布模型、螺旋模型、增量与迭代、敏捷开发