当前位置: 首页 > news >正文

foreach 块并行加速

 实例1

1. 任务分块(chunking)

我们手动把 1:nrow(pair_list_df) 切分为 N 块,每块是一个线程要处理的任务:

  • 每个线程一次处理一个“任务块”而不是一个“任务点”,极大减少调度开销。

  • 保证线程之间处理量均衡,避免有的线程闲了、有的线程忙到最后。

library(dplyr)
library(stringr)
library(foreach)
library(doParallel)# 设置并行核心数
n_cores <- parallel::detectCores() - 1
cl <- makeCluster(n_cores)
registerDoParallel(cl)# 预处理 pair list
pair_list_df <- pair_list %>%str_split_fixed("__", 2) %>%as.data.frame(stringsAsFactors = FALSE) %>%filter(V1 %in% mRNA_id & V2 %in% miRNA_id)rm(pair_list)# 按核心数将任务切块
block_indices <- split(1:nrow(pair_list_df), cut(1:nrow(pair_list_df), n_cores, labels = FALSE))# 并行计算
results <- foreach(block = block_indices, .combine = rbind,.packages = c("dplyr")) %dopar% {block_result <- lapply(block, function(i) {tryCatch({mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]x <- as.numeric(Esn_transcript_TPM[mRNA_name, ])y <- as.numeric(miRNAs_expressed_TPM[miRNA_name, ])if (all(is.na(x)) || all(is.na(y)) || sd(x, na.rm = TRUE) == 0 || sd(y, na.rm = TRUE) == 0) {return(data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA))}test <- cor.test(x, y, method = "pearson")data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = test$estimate, pvalue = test$p.value)}, error = function(e) {mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA)})})do.call(rbind, block_result)
}# 添加结果回原始表
pair_list_df$cor <- results$cor
pair_list_df$pvalue <- results$pvalue# 可选:输出完整 results 表,包括 row、mRNA、miRNA、cor、pvalue
# View(results) 或 write.csv(results, "correlation_results.csv", row.names = FALSE)# 关闭并行线程
stopCluster(cl)

实例2 从基因gtf文件提取,将所有mRNA的位置分为100个2%的滑动窗口

library(dplyr)
library(tibble)# 可选:如果有染色体长度表 chr_len(命名向量),可用于上边界裁剪
# 例:chr_len <- c("NC_066509.1"=100000000, "NC_066510.1"=95000000, ...)
# 没有就注释掉下一行,以及下面代码中的 pmin(., chr_len[chr]) 那部分
# chr_len <- NULLsliding_windows <- tibble()   # 直接用数据框累积# 固定上下游参数
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN * 0.02))  # 40
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01))  # 20for (i in seq_len(nrow(mRNA_gtfdata))) {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])# ========= 1) gene body =========len     <- e - s + 1Lwin_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)# 1..100 相对编号(按区域起止映射;如需按转录方向,可把 frac 换成 strand-aware 的)b_centers <- (b_starts + b_ends) / 2b_frac    <- (b_centers - s) / lenb_rank    <- pmax(1L, pmin(100L, floor(b_frac * 100) + 1L))df_body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)sliding_windows <- bind_rows(sliding_windows, df_body)}# ========= 2) upstream 2kb =========if (strand == "+") {us <- s - UPDN_LENue <- s - 1L} else {us <- e + 1Lue <- e + UPDN_LEN}# 下边界裁到 1,上边界如有 chr_len 可再裁一次us <- max(1L, us)# 若有 chr_len: ue <- if (is.null(chr_len)) ue else min(ue, as.integer(chr_len[chr]))if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2u_frac    <- (u_centers - us) / (ue - us + 1L)u_rank    <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))df_up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)sliding_windows <- bind_rows(sliding_windows, df_up)}# ========= 3) downstream 2kb =========if (strand == "+") {ds <- e + 1Lde <- e + UPDN_LEN} else {ds <- s - UPDN_LENde <- s - 1L}ds <- max(1L, ds)# 若有 chr_len: de <- if (is.null(chr_len)) de else min(de, as.integer(chr_len[chr]))if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac    <- (d_centers - ds) / (de - ds + 1L)d_rank    <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))df_down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)sliding_windows <- bind_rows(sliding_windows, df_down)}print(i)
}# 结果预览
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)####  多线程版本library(doParallel)
library(foreach)
library(dplyr)
library(tibble)# 可选:染色体长度(如果有就填,避免越界;没有就留 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL# 上/下游固定参数
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN * 0.02))  # 40 bp
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01))  # 20 bp# 并行环境
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)sliding_windows <- foreach(i = seq_len(nrow(mRNA_gtfdata)),.combine = dplyr::bind_rows,.packages = c("dplyr","tibble")) %dopar% {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])out_list <- list()# 1) gene body:窗=2%len,步=1%len;rank 随转录方向(5'->3')递增len     <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2# strand-aware:正链从 s->e,负链从 e->sfrac <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank <- pmax(1L, pmin(100L, floor(frac * 100) + 1L))out_list$body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)}}# 小工具:裁边(避免 <1;如提供 chr_len 再裁上界)clamp <- function(x, chr) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}# 2) upstream 2kb(按链向定义)if (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us, chr); ue <- clamp(ue, chr)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2# 远端->近端 映射到 1..100u_frac <- (u_centers - us) / (ue - us + 1L)u_rank <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))out_list$up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)}# 3) downstream 2kb(按链向定义)if (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds, chr); de <- clamp(de, chr)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac <- (d_centers - ds) / (de - ds + 1L)d_rank <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))out_list$down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)}dplyr::bind_rows(out_list)}stopCluster(cl)# 结果查看
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)####  多线程版本块并行写法
#  任务分块(chunking)
library(doParallel)
library(foreach)
library(dplyr)
library(tibble)## 并行环境 ---------------------------------------------------------------
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)## 可选:染色体长度(命名向量;没有就设为 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL## 上/下游固定参数 --------------------------------------------------------
UPDN_LEN  <- 2000L
UPDN_WIN  <- max(1L, floor(UPDN_LEN*0.02))  # 40bp
UPDN_STEP <- max(1L, floor(UPDN_LEN*0.01))  # 20bp## 将任务切块:每核一块(行号均分到 n_cores 份) --------------------------
idx_all <- seq_len(nrow(mRNA_gtfdata))
block_indices <- split(idx_all, cut(idx_all, n_cores, labels = FALSE))## 开撸:块并行,每块内部用 for 循环拼成一个 data.frame 再返回 --------------
sliding_windows <- foreach(blk = block_indices,.combine = dplyr::bind_rows,.multicombine = TRUE,.maxcombine = n_cores,.packages = c("dplyr","tibble")
) %dopar% {out_block <- vector("list", length(blk))  # 先预分配,块内少拷贝k <- 0Lfor (i in blk) {chr    <- mRNA_gtfdata$seqnames[i]s      <- as.integer(mRNA_gtfdata$start[i])e      <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid    <- as.character(mRNA_gtfdata$gene_id[i])res_list <- list()## 1) body:窗=2%len,步=1%len;rank 随转录方向(5'->3')len <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step    <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends   <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2frac    <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank  <- pmax(1L, pmin(100L, floor(frac*100) + 1L))res_list$body <- tibble(seqnames  = chr,win_start = b_starts,win_end   = b_ends,gene_id   = gid,strand    = strand,region    = "body",rank      = b_rank)}}## clamp:裁边clamp <- function(x) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}## 2) upstream 2kbif (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us); ue <- clamp(ue)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends   <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers<- (u_starts + u_ends) / 2u_frac   <- (u_centers - us) / (ue - us + 1L)u_rank   <- pmax(1L, pmin(100L, floor(u_frac*100) + 1L))res_list$up <- tibble(seqnames  = chr,win_start = u_starts,win_end   = u_ends,gene_id   = gid,strand    = strand,region    = "upstream2k",rank      = u_rank)}## 3) downstream 2kbif (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds); de <- clamp(de)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends   <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers<- (d_starts + d_ends) / 2d_frac   <- (d_centers - ds) / (de - ds + 1L)d_rank   <- pmax(1L, pmin(100L, floor(d_frac*100) + 1L))res_list$down <- tibble(seqnames  = chr,win_start = d_starts,win_end   = d_ends,gene_id   = gid,strand    = strand,region    = "downstream2k",rank      = d_rank)}k <- k + 1Lout_block[[k]] <- dplyr::bind_rows(res_list)}# 把该块内所有基因的窗口拼成一个 data.frame 返回dplyr::bind_rows(out_block[seq_len(k)])
}stopCluster(cl)# 看看结果
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)

http://www.lryc.cn/news/617457.html

相关文章:

  • docker compose和docker-compose命令的区别
  • 力扣164:最大间距
  • 大数据系统架构模式:驾驭海量数据的工程范式
  • React(四):事件总线、setState的细节、PureComponent、ref
  • LeetCode 2438.二的幂数组中查询范围内的乘积:模拟(前缀和可选)
  • C++项目实战(日期类的实现)
  • MFC C++ 使用ODBC方式调用Oracle数据库的详细步骤
  • 重学React(五):脱围机制一
  • 金蝶云星辰:赋能企业数据管理
  • spring boot 整合redis教程
  • 带简易后台管理的米表系统 域名出售系统 自适应页面
  • 帝国理工学院团队研发:Missense3D-PTMdb—— 解析遗传变异与翻译后修饰的交互式工具
  • 计算机网络---交换机
  • 套接字技术、视频加载技术、断点续传技术
  • Horse3D引擎研发笔记(四):在QtOpenGL下仿three.js,封装EBO绘制四边形
  • 2025 年国内可用 Docker 镜像加速器地址
  • Rust面试题及详细答案120道(19-26)-- 所有权与借用
  • 《基于Pytorch实现的声音分类 :网页解读》
  • YOLOv8 训练报错:PyTorch 2.6+ 模型加载兼容性问题解决
  • 【JavaEE】(12) 创建一个 Sring Boot 项目
  • 第二届机电一体化、机器人与控制系统国际会议(MRCS 2025)
  • 34-Hive SQL DML语法之查询数据-3
  • 2025世界机器人大会,多形态机器人开启商业化落地浪潮
  • [4.2-2] NCCL新版本的register如何实现的?
  • GAI 与 Tesla 机器人的具体联动机制
  • 记录一下通过STC的ISP软件修改stc32的EEPROM值大小
  • VoxCraft-生数科技推出的免费3D模型AI生成工具
  • uni-app app端安卓和ios如何申请麦克风权限,唤起提醒弹框
  • 设计模式笔记_结构型_组合模式
  • 5G NTN 卫星测试产品