foreach 块并行加速
实例1
1. 任务分块(chunking)
我们手动把 1:nrow(pair_list_df)
切分为 N 块,每块是一个线程要处理的任务:
每个线程一次处理一个“任务块”而不是一个“任务点”,极大减少调度开销。
保证线程之间处理量均衡,避免有的线程闲了、有的线程忙到最后。
library(dplyr)
library(stringr)
library(foreach)
library(doParallel)# 设置并行核心数
n_cores <- parallel::detectCores() - 1
cl <- makeCluster(n_cores)
registerDoParallel(cl)# 预处理 pair list
pair_list_df <- pair_list %>%str_split_fixed("__", 2) %>%as.data.frame(stringsAsFactors = FALSE) %>%filter(V1 %in% mRNA_id & V2 %in% miRNA_id)rm(pair_list)# 按核心数将任务切块
block_indices <- split(1:nrow(pair_list_df), cut(1:nrow(pair_list_df), n_cores, labels = FALSE))# 并行计算
results <- foreach(block = block_indices, .combine = rbind,.packages = c("dplyr")) %dopar% {block_result <- lapply(block, function(i) {tryCatch({mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]x <- as.numeric(Esn_transcript_TPM[mRNA_name, ])y <- as.numeric(miRNAs_expressed_TPM[miRNA_name, ])if (all(is.na(x)) || all(is.na(y)) || sd(x, na.rm = TRUE) == 0 || sd(y, na.rm = TRUE) == 0) {return(data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA))}test <- cor.test(x, y, method = "pearson")data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = test$estimate, pvalue = test$p.value)}, error = function(e) {mRNA_name <- pair_list_df$V1[i]miRNA_name <- pair_list_df$V2[i]data.frame(row = i, mRNA = mRNA_name, miRNA = miRNA_name, cor = NA, pvalue = NA)})})do.call(rbind, block_result)
}# 添加结果回原始表
pair_list_df$cor <- results$cor
pair_list_df$pvalue <- results$pvalue# 可选:输出完整 results 表,包括 row、mRNA、miRNA、cor、pvalue
# View(results) 或 write.csv(results, "correlation_results.csv", row.names = FALSE)# 关闭并行线程
stopCluster(cl)
实例2 从基因gtf文件提取,将所有mRNA的位置分为100个2%的滑动窗口
library(dplyr)
library(tibble)# 可选:如果有染色体长度表 chr_len(命名向量),可用于上边界裁剪
# 例:chr_len <- c("NC_066509.1"=100000000, "NC_066510.1"=95000000, ...)
# 没有就注释掉下一行,以及下面代码中的 pmin(., chr_len[chr]) 那部分
# chr_len <- NULLsliding_windows <- tibble() # 直接用数据框累积# 固定上下游参数
UPDN_LEN <- 2000L
UPDN_WIN <- max(1L, floor(UPDN_LEN * 0.02)) # 40
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01)) # 20for (i in seq_len(nrow(mRNA_gtfdata))) {chr <- mRNA_gtfdata$seqnames[i]s <- as.integer(mRNA_gtfdata$start[i])e <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid <- as.character(mRNA_gtfdata$gene_id[i])# ========= 1) gene body =========len <- e - s + 1Lwin_len <- max(1L, floor(len * 0.02))step <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends <- pmin(b_starts + win_len - 1L, e)# 1..100 相对编号(按区域起止映射;如需按转录方向,可把 frac 换成 strand-aware 的)b_centers <- (b_starts + b_ends) / 2b_frac <- (b_centers - s) / lenb_rank <- pmax(1L, pmin(100L, floor(b_frac * 100) + 1L))df_body <- tibble(seqnames = chr,win_start = b_starts,win_end = b_ends,gene_id = gid,strand = strand,region = "body",rank = b_rank)sliding_windows <- bind_rows(sliding_windows, df_body)}# ========= 2) upstream 2kb =========if (strand == "+") {us <- s - UPDN_LENue <- s - 1L} else {us <- e + 1Lue <- e + UPDN_LEN}# 下边界裁到 1,上边界如有 chr_len 可再裁一次us <- max(1L, us)# 若有 chr_len: ue <- if (is.null(chr_len)) ue else min(ue, as.integer(chr_len[chr]))if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2u_frac <- (u_centers - us) / (ue - us + 1L)u_rank <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))df_up <- tibble(seqnames = chr,win_start = u_starts,win_end = u_ends,gene_id = gid,strand = strand,region = "upstream2k",rank = u_rank)sliding_windows <- bind_rows(sliding_windows, df_up)}# ========= 3) downstream 2kb =========if (strand == "+") {ds <- e + 1Lde <- e + UPDN_LEN} else {ds <- s - UPDN_LENde <- s - 1L}ds <- max(1L, ds)# 若有 chr_len: de <- if (is.null(chr_len)) de else min(de, as.integer(chr_len[chr]))if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac <- (d_centers - ds) / (de - ds + 1L)d_rank <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))df_down <- tibble(seqnames = chr,win_start = d_starts,win_end = d_ends,gene_id = gid,strand = strand,region = "downstream2k",rank = d_rank)sliding_windows <- bind_rows(sliding_windows, df_down)}print(i)
}# 结果预览
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)#### 多线程版本library(doParallel)
library(foreach)
library(dplyr)
library(tibble)# 可选:染色体长度(如果有就填,避免越界;没有就留 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL# 上/下游固定参数
UPDN_LEN <- 2000L
UPDN_WIN <- max(1L, floor(UPDN_LEN * 0.02)) # 40 bp
UPDN_STEP <- max(1L, floor(UPDN_LEN * 0.01)) # 20 bp# 并行环境
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)sliding_windows <- foreach(i = seq_len(nrow(mRNA_gtfdata)),.combine = dplyr::bind_rows,.packages = c("dplyr","tibble")) %dopar% {chr <- mRNA_gtfdata$seqnames[i]s <- as.integer(mRNA_gtfdata$start[i])e <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid <- as.character(mRNA_gtfdata$gene_id[i])out_list <- list()# 1) gene body:窗=2%len,步=1%len;rank 随转录方向(5'->3')递增len <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2# strand-aware:正链从 s->e,负链从 e->sfrac <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank <- pmax(1L, pmin(100L, floor(frac * 100) + 1L))out_list$body <- tibble(seqnames = chr,win_start = b_starts,win_end = b_ends,gene_id = gid,strand = strand,region = "body",rank = b_rank)}}# 小工具:裁边(避免 <1;如提供 chr_len 再裁上界)clamp <- function(x, chr) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}# 2) upstream 2kb(按链向定义)if (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us, chr); ue <- clamp(ue, chr)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers <- (u_starts + u_ends) / 2# 远端->近端 映射到 1..100u_frac <- (u_centers - us) / (ue - us + 1L)u_rank <- pmax(1L, pmin(100L, floor(u_frac * 100) + 1L))out_list$up <- tibble(seqnames = chr,win_start = u_starts,win_end = u_ends,gene_id = gid,strand = strand,region = "upstream2k",rank = u_rank)}# 3) downstream 2kb(按链向定义)if (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds, chr); de <- clamp(de, chr)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers <- (d_starts + d_ends) / 2d_frac <- (d_centers - ds) / (de - ds + 1L)d_rank <- pmax(1L, pmin(100L, floor(d_frac * 100) + 1L))out_list$down <- tibble(seqnames = chr,win_start = d_starts,win_end = d_ends,gene_id = gid,strand = strand,region = "downstream2k",rank = d_rank)}dplyr::bind_rows(out_list)}stopCluster(cl)# 结果查看
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)#### 多线程版本块并行写法
# 任务分块(chunking)
library(doParallel)
library(foreach)
library(dplyr)
library(tibble)## 并行环境 ---------------------------------------------------------------
n_cores <- max(1, parallel::detectCores() - 1)
cl <- makeCluster(n_cores)
registerDoParallel(cl)## 可选:染色体长度(命名向量;没有就设为 NULL)
# chr_len <- c("NC_066509.1"=123456789, ...)
chr_len <- NULL## 上/下游固定参数 --------------------------------------------------------
UPDN_LEN <- 2000L
UPDN_WIN <- max(1L, floor(UPDN_LEN*0.02)) # 40bp
UPDN_STEP <- max(1L, floor(UPDN_LEN*0.01)) # 20bp## 将任务切块:每核一块(行号均分到 n_cores 份) --------------------------
idx_all <- seq_len(nrow(mRNA_gtfdata))
block_indices <- split(idx_all, cut(idx_all, n_cores, labels = FALSE))## 开撸:块并行,每块内部用 for 循环拼成一个 data.frame 再返回 --------------
sliding_windows <- foreach(blk = block_indices,.combine = dplyr::bind_rows,.multicombine = TRUE,.maxcombine = n_cores,.packages = c("dplyr","tibble")
) %dopar% {out_block <- vector("list", length(blk)) # 先预分配,块内少拷贝k <- 0Lfor (i in blk) {chr <- mRNA_gtfdata$seqnames[i]s <- as.integer(mRNA_gtfdata$start[i])e <- as.integer(mRNA_gtfdata$end[i])strand <- as.character(mRNA_gtfdata$strand[i])gid <- as.character(mRNA_gtfdata$gene_id[i])res_list <- list()## 1) body:窗=2%len,步=1%len;rank 随转录方向(5'->3')len <- e - s + 1Lif (len > 0L) {win_len <- max(1L, floor(len * 0.02))step <- max(1L, floor(len * 0.01))if (len >= win_len) {b_starts <- seq.int(s, e - win_len + 1L, by = step)b_ends <- pmin(b_starts + win_len - 1L, e)centers <- (b_starts + b_ends) / 2frac <- if (strand == "+") (centers - s)/len else (e - centers)/lenb_rank <- pmax(1L, pmin(100L, floor(frac*100) + 1L))res_list$body <- tibble(seqnames = chr,win_start = b_starts,win_end = b_ends,gene_id = gid,strand = strand,region = "body",rank = b_rank)}}## clamp:裁边clamp <- function(x) {x <- pmax(1L, x)if (!is.null(chr_len) && !is.na(chr_len[chr])) x <- pmin(x, as.integer(chr_len[chr]))x}## 2) upstream 2kbif (strand == "+") { us <- s - UPDN_LEN; ue <- s - 1L } else { us <- e + 1L; ue <- e + UPDN_LEN }us <- clamp(us); ue <- clamp(ue)if (ue - us + 1L >= UPDN_WIN) {u_starts <- seq.int(us, ue - UPDN_WIN + 1L, by = UPDN_STEP)u_ends <- pmin(u_starts + UPDN_WIN - 1L, ue)u_centers<- (u_starts + u_ends) / 2u_frac <- (u_centers - us) / (ue - us + 1L)u_rank <- pmax(1L, pmin(100L, floor(u_frac*100) + 1L))res_list$up <- tibble(seqnames = chr,win_start = u_starts,win_end = u_ends,gene_id = gid,strand = strand,region = "upstream2k",rank = u_rank)}## 3) downstream 2kbif (strand == "+") { ds <- e + 1L; de <- e + UPDN_LEN } else { ds <- s - UPDN_LEN; de <- s - 1L }ds <- clamp(ds); de <- clamp(de)if (de - ds + 1L >= UPDN_WIN) {d_starts <- seq.int(ds, de - UPDN_WIN + 1L, by = UPDN_STEP)d_ends <- pmin(d_starts + UPDN_WIN - 1L, de)d_centers<- (d_starts + d_ends) / 2d_frac <- (d_centers - ds) / (de - ds + 1L)d_rank <- pmax(1L, pmin(100L, floor(d_frac*100) + 1L))res_list$down <- tibble(seqnames = chr,win_start = d_starts,win_end = d_ends,gene_id = gid,strand = strand,region = "downstream2k",rank = d_rank)}k <- k + 1Lout_block[[k]] <- dplyr::bind_rows(res_list)}# 把该块内所有基因的窗口拼成一个 data.frame 返回dplyr::bind_rows(out_block[seq_len(k)])
}stopCluster(cl)# 看看结果
dplyr::glimpse(sliding_windows)
head(sliding_windows, 10)