rna_seq_pipeline.py-python002
python rna_seq_pipeline.py -r genome.fasta -g genome.gff -s samples.txt -o results_dir -t 8 -j 4
1. 准备工作
首先确保以下工具已经安装并在系统的 $PATH
中:
hisat2: 用于比对参考基因组。
stringtie: 用于转录本组装和定量。
samtools: 用于处理BAM文件。
gffread: 用于转换GFF到GTF。
extract_exons.py, extract_splice_sites.py: 用于提取外显子和剪接位点。
2. 输入文件
ref: 参考基因组的FASTA文件。
gff: 参考注释的GFF3文件。
samples: 包含样本信息的文件,每行格式为
group sampleID fq1_file fq2_file
。
3. 参数说明
-r
: 参考基因组FASTA文件路径。-g
: 参考注释GFF3文件路径。-s
: 样本信息文件路径。-o
: 输出目录路径,默认为RNAseq_analysis_out
。-t
: 用于比对的线程数,默认为 4。-j
: 用于排序BAM文件的线程数,默认为 2。-F
: 强制删除旧分析结果。-m
: 最小内含子长度,默认为 20。-M
: 最大内含子长度,默认为 15000。-A
: 是否对新增样本进行比对。-i
: 是否忽略新转录本的组装,yes
表示仅估算已知转录本的丰度,no
表示进行转录本的组装。
import sys, os, subprocess
import argparse
import shutil
from concurrent.futures import ThreadPoolExecutor# required tools: hisat2, stringtie...
binaries = {'alignment': ['hisat2', 'hisat2-build', 'extract_exons.py', 'extract_splice_sites.py','samtools', 'gffread' ],'expression': ['stringtie', 'gffcompare']
}# Check if file exists
def fileExist(fileName):return os.path.exists(fileName)# Check if a command is executable
def is_exe(fpath):return os.path.isfile(fpath) and os.access(fpath, os.X_OK)def which(program):fpath, fname = os.path.split(program)if fpath:if is_exe(program):return programelse:for path in os.environ["PATH"].split(os.pathsep):exe_file = os.path.join(path, program)if is_exe(exe_file):return exe_filereturn None# Check if binaries exist in system
def checkBinaries(binaries):for bins in binaries.values():for b in bins:if not which(b):raise Exception(f'{b} command not found.')# Run command
def run(cmd):try:subprocess.check_call(cmd, shell=True)except subprocess.CalledProcessError as e:print(f"Error occurred while running command: {cmd}")print(f"Error: {e}")sys.exit(1)# Check if files exist
def check_files_exist(files):for file in files:if not fileExist(file):raise Exception(f'ERROR: {file} cannot be found!')# Get sample information from file
def getSamples(sampleFile):samples = {}with open(sampleFile, 'r') as f:for line in f:grpName, sampleID, fq1, fq2 = line.strip().split()if sampleID in samples:raise Exception(f'Duplicated sample id: {sampleID}')samples[sampleID] = [grpName, fq1, fq2]return samples# Create directory if not exists, or force remove and create
def mkdir(dirName, force=False):if fileExist(dirName):if force:try:if len(os.listdir(dirName)) > 0:shutil.rmtree(dirName)else:os.removedirs(dirName)os.makedirs(dirName)except IOError:print(f"Cannot remove folder: {dirName}")sys.stdout.flush()else:raise Exception("Output folder exists.")else:os.makedirs(dirName)# Prepare directories and check files
def prepare(args):check_files_exist([args.ref, args.gff, args.samples])samples = getSamples(args.samples)for s in samples:for fq in samples[s][1:]:fqList = fq.split(',')for f in fqList:check_files_exist([f])# create output directories if necessaryif not args.align_new_samples:mkdir(args.outdir, args.force)mkdir(args.outdir + '/BAMs')mkdir(args.outdir + '/ballgown/')for sampleName in samples.keys():mkdir(args.outdir + '/ballgown/' + sampleName)# Alignment function
def alignment(args):hisat2, hisat2_build, extract_exon, extract_splice_site, samtools, gffread = (which(b) for b in binaries['alignment'])gtf = args.outdir + '/genome.gtf'# Convert GFF to GTFcmd = f"{gffread} {args.gff} -T -o {gtf}"if not args.align_new_samples:run(cmd)spliceSites = args.outdir + '/genome.ss'exons = args.outdir + '/genome.exons'# Extract splice sites and exonscmd = f"{extract_splice_site} {gtf} > {spliceSites}"run(cmd)cmd = f"{extract_exon} {gtf} > {exons}"run(cmd)# Build HISAT2 indexcmd = f"{hisat2_build} {args.ref} {args.outdir}/genome"if args.build_index_with_annotation:cmd = f"{hisat2_build} --ss {spliceSites} --exon {exons} {args.ref} {args.outdir}/genome"run(cmd)# Perform alignment for all samples using ThreadPoolExecutorsamples = getSamples(args.samples)with ThreadPoolExecutor(max_workers=int(args.threads)) as executor:executor.map(run_alignment_for_sample, samples.keys(), [samples] * len(samples), [args] * len(samples))print("Alignment finished!")sys.stdout.flush()# Perform alignment for each sample
def run_alignment_for_sample(sample, samples, args):hisat2, samtools = (which(b) for b in ['hisat2', 'samtools'])grp, fq1, fq2 = samples[sample]outBam = args.outdir + '/BAMs/' + sample + '.bam'cmd = f"{hisat2} -p {args.threads} --min-intronlen {args.min_intronlen} --max-intronlen {args.max_intronlen} --dta -x {args.outdir}/genome -1 {fq1} -2 {fq2} | {samtools} sort -@ {args.sort_threads} -o {outBam} -O BAM && {samtools} index {outBam}"run(cmd)# Expression analysis
def expression(args):stringtie, gffcompare = (which(b) for b in binaries['expression'])samples = getSamples(args.samples)refGtf = args.outdir + '/genome.gtf'mergedGtf = args.outdir + '/genome_sample_merged.gtf'if args.ignore_new_trans == 'yes':for s in samples:inputBam = args.outdir + '/BAMs/' + s + '.bam'outGtf = args.outdir + '/ballgown/' + s + '/' + s + '.gtf'geneAbundance = args.outdir + '/ballgown/' + s + '/gene_data.ctab'cmd = f"{stringtie} {inputBam} -G {refGtf} -o {outGtf} -e -B -A {geneAbundance} -p {args.threads}"run(cmd)elif args.ignore_new_trans == 'no':for s in samples:inputBam = args.outdir + '/BAMs/' + s + '.bam'outGtf = args.outdir + '/ballgown/' + s + '/' + s + '.gtf'cmd = f"{stringtie} -p {args.threads} -G {refGtf} -o {outGtf} -l {s} {inputBam}"run(cmd)cmd = f"ls {args.outdir}/ballgown/*/*gtf > sample_GTF.list"run(cmd)cmd = f"{stringtie} --merge -p {args.threads} -G {refGtf} -o {mergedGtf} sample_GTF.list"run(cmd)compareResultPrefix = args.outdir + '/genome_sample_merged'cmd = f"{gffcompare} -r {refGtf} -G -o {compareResultPrefix} {mergedGtf}"run(cmd)for s in samples:inputBam = args.outdir + '/BAMs/' + s + '.bam'outGtf = args.outdir + '/ballgown/' + s + '/' + s + '.gtf'geneAbundance = args.outdir + '/ballgown/' + s + '/gene_data.ctab'cmd = f"{stringtie} {inputBam} -G {mergedGtf} -o {outGtf} -e -B -A {geneAbundance} -p {args.threads}"run(cmd)# Argument parsing
def getArgs(argList):parser = argparse.ArgumentParser(description="A Python wrapper for RNA-seq data analysis based on HISAT2 + StringTie + Ballgown pipeline.")parser.add_argument('-r', '--ref', help="Reference genome fasta", required=True)parser.add_argument('-g', '--gff', help="Reference annotation GFF3", required=True)parser.add_argument('-s', '--samples', help="Sample information file", required=True)parser.add_argument('-o', '--outdir', help="Output folder", default="RNAseq_analysis_out")parser.add_argument('-t', '--threads', help="Number of threads for alignment", default='4')parser.add_argument('-j', '--sort_threads', help="Threads for samtools sorting", default='2')parser.add_argument('-F', '--force', help="Force remove old analysis results", action="store_true")parser.add_argument('-m', '--min_intronlen', help="Min intron length", default='20')parser.add_argument('-M', '--max_intronlen', help="Max intron length", default='15000')parser.add_argument('-A', '--align_new_samples', help="Align new samples", action="store_true")parser.add_argument('-i', '--ignore_new_trans', help="Do not assemble new transcripts", choices=['yes', 'no'], default='yes')return parser.parse_args(argList)# Main function
def main(argList):checkBinaries(binaries)args = getArgs(argList)prepare(args)alignment(args)expression(args)if __name__ == "__main__":main(sys.argv[1:])