当前位置: 首页 > news >正文

R 语言 | future 包,非阻塞的执行耗时脚本

目的:有一段代码,后面暂时用不到,但是又很耗时,占了当前R session,难道只能等半个小时,等到它结束才能画图?

可以使用R多线程,在支线进程中执行耗时任务,同时不阻塞当前R进程,可以继续干活。

  • 在Rstudio下只能使用 plan(multisession, workers=5)。
  • 在shell R下,还可以使用 plan(multiprocess) ,plan(multicore)[Not supported on Windows.]

Strategy ‘multiprocess’ is deprecated in future (>= 1.20.0) [2020-10-30].
Instead, explicitly specify either ‘multisession’ (recommended) or ‘multicore’. In the current R session, ‘multiprocess’ equals ‘multisession’.

1. 非阻塞的启动R多进程支线任务

# 第一步,开启多线程。只能linux系统,必须开多线程
library(future)
plan(multisession, workers = 3) # 设置为多进程模式# 第二部:定义耗时任务
save_task <- future({pid=Sys.getpid()print( paste0("Start ...", pid) )#save.image("before.RData")Sys.sleep(30) # 模拟耗时1小时# 这里保存Rds文件,可能需要花费几十分钟到2个小时# save.image("end.RData")print( paste0("End...", pid) )100 #最后一行是线程的返回值,类似函数的感觉。
})# 第三步,继续执行其他任务
# 在任务运行时,可以继续执行其他代码,不用等待耗时任务结束
print("正在后台保存文件,可以继续执行其他任务...")

2.检查一个支线任务是否结束

  • 要检查任务的状态,可以使用 value() 函数,它会阻塞直到任务完成并返回结果。
  • resolved() 函数用于检查任务是否已完成,是非阻塞的。
  • 如果想非阻塞地检查结果,可以结合使用 resolved() 和 value() 函数。
# f 是一个future任务 
# 非阻塞地检查是否完成
if (resolved(f)) {result <- value(f)  # 如果已完成,获取结果print(result)
} else {print("任务尚未完成")
}

继续本示例:

resolved(save_task) #非阻塞的查看状态,可以随时查看状态,返回T/F
> value(save_task ) #阻塞的查看返回结果:最后一行的值
[1] "Start ...1389"
[1] "End...1389"plan(sequential) # 恢复 设置为单进程模式

3. 等待所有支线任务都结束

场景:分别计算每个亚群的高变基因,每个亚群都计算完才能进行下一步。

# arr 是数组,其成员是 future 变量
while(any(!resolved(arr))){ } #阻塞,直到所有支线都出结果:直到任何一个都是T

4. 竞速模式:等待最快的一个线程得到结果

场景:分别使用多个网站下载数据,只要有一个途径下载好,即可开始下一步。

require(future)
#plan(multiprocess)
plan(multisession, workers=5)longRunningFunction <- function(value, seed=0) {set.seed(seed)random1<- runif(min= 5 ,max = 30,n = 1)Sys.sleep(random1)return(value)
}arr = list()#changed starting number to 1 since R lists start at 1, not 0
i=1#If the number of futures generated is more than the number of cores available, then the main thread will block until the first future completes and allows more futures to be started
while(i < 6) {arr[[i]] = future(longRunningFunction(i, seed = i), seed = T)i = i + 1
}# 一开始都是F,都没有解析出;最后都是T,都解析了。
while(any(!resolved(arr))){ } #阻塞,直到所有支线都出结果:直到任何一个都是T
#while(all(!resolved(arr))){ } #阻塞,直到有一个支线出结果: 直到至少一个是T[竞速模式]raceresults_from_future<-lapply(arr[resolved(arr)], value)
print(paste("raceresults_from_future: ",raceresults_from_future) )

5. 传入环境,给多线程的内存消耗瘦身

future开启的支线默认是复制主进程的全部环境,这会导致R的内存开销很大,而其中很大一部分是用不到的拷贝。
可以手动指定某些变量传入执行环境,来达到多进程内存瘦身的目的。

out.a=123# 1 开启多线程。只能linux系统,必须开多线程
library(future)
plan(multisession, workers = 3) # 设置为多进程模式# 2 创建环境
#  用于限定多进程拷贝的变量个数,默认是拷贝父进程的整个环境
e1 <- new.env(parent = baseenv())
e1$a=out.a+1902
get("a", e1) #2025 #获取环境中的变量值assign("b", -3210, envir = e1) #给环境e1中的变量b赋值
get("b", envir = e1) #03210 获取环境中的变量值# 3 环境作为future的第二个参数
task2 <- future({pid=Sys.getpid()print(sprintf("task [%s]", pid))Sys.sleep(5)a+100 #最后一行是线程的返回值,类似函数的感觉。
}, envir = e1 ) #envir 传入变量,否则默认传入父环境# 可以执行其他任务# 4 查看多进程结果
resolved(task2) #非阻塞(立刻返回)的查看状态,可以随时查看状态,返回T/F
while(!resolved(task2)){} #阻塞,直到支线任务完成
value(task2 ) #阻塞的(直到有结果)查看返回结果:最后一行的值 2125
task2plan(sequential) # 恢复 设置为单进程模式

Ref:

  • https://rstudio.github.io/promises/articles/promises_03_overview.html
  • 竞速模式 https://stackoverflow.com/questions/52040744/r-waiting-for-a-list-of-promises-to-resolve
http://www.lryc.cn/news/527346.html

相关文章:

  • UE学习日志#12 Niagara特效大致了解(水文,主要是花时间读了读文档和文章)
  • 【数据结构】_链表经典算法OJ:合并两个有序数组
  • Mongodb副本集群为什么选择3个节点不选择4个节点
  • 基于 WEB 开发的手机销售管理系统设计与实现内容
  • LeetCode - Google 大模型校招10题 第1天 Attention 汇总 (3题)
  • Vue3 provide/inject用法总结
  • Linux——网络基础(1)
  • 【记录】日常|从零散记录到博客之星Top300的成长之路
  • 【二分查找】力扣373. 查找和最小的 K 对数字
  • 池化层Pooling Layer
  • 力扣算法题——11.盛最多水的容器
  • 自由学习记录(32)
  • VScode+Latex (Recipe terminated with fatal error: spawn xelatex ENOENT)
  • 「蓝桥杯题解」蜗牛(Java)
  • PHP EOF (Heredoc) 详解
  • pyautogui操控Acrobat DC pro万能PDF转Word,不丢任何PDF格式样式
  • Day32:字符串的复制
  • 基于Mybatis继承AbstractRoutingDataSource使用自定义注解实现动态数据源
  • ZooKeeper 数据模型
  • 【VUE】Vue2中Vue.extend方法
  • MaskGAE论文阅读
  • Mybatis-plus 更新 Null 的策略踩坑记
  • Oracle迁移DM数据库
  • HTML特殊符号的使用示例
  • 数据结构基础之《(15)—排序算法小结》
  • Linux系统下速通stm32的clion开发环境配置
  • 【2024年 CSDN博客之星】我的2024年创作之旅:从C语言到人工智能,个人成长与突破的全景回顾
  • Python 轻松扫描,快速检测:高效IP网段扫描工具全解析
  • go入门Windows环境搭建
  • 安装Ubuntu22.04