我处于一种情况,我想将我的工作流分散到可变数量的块中,而这些块我之前都不知道。也许通过具体解释这个问题最容易:
有人将bcl2fastq
与no-lane-splitting
选项一起使用的FASTQ文件交给了我。我想根据泳道拆分这些文件,分别映射每个泳道,然后再次收集所有内容。但是,我事先不知道车道的数量。
理想情况下,我想要这样的解决方案,
rule split_fastq_file: (...) # results in N FASTQ files
rule map_fastq_file: (...) # do this N times
rule merge_bam_files: (...) # merge the N BAM files
但是我不确定这是可能的。 expand
函数要求我知道通道数,也无法看到如何为此使用通配符。
我应该说我对Snakemake还是比较陌生的,我可能完全误解了Snakemake的工作原理。我花了一些时间习惯于专注于输出文件,然后进行反向工作,以“颠倒”的方式思考问题。
一种选择是在分割fastq时使用checkpoint
,以便您可以在以后动态地重新评估DAG,以获得最终的通道。
这是逐步的MWE:
# Requires Python 3.6+ for f-strings, Snakemake 5.4+ for checkpoints
import pathlib
import random
random.seed(1)
rule make_fastq:
output:
fastq = touch("input/{sample}.fastq")
checkpoint
,而不是规则,以便以后可以访问结果。另外,我们在这里将输出声明为特定于样本的目录,以便以后可以在其中查找以获得已创建的泳道。 checkpoint split_fastq:
input:
fastq = rules.make_fastq.output.fastq
output:
lane_dir = directory("temp/split_fastq/{sample}")
run:
pathlib.Path(output.lane_dir).mkdir(exist_ok=True)
n_lanes = random.randrange(1, 10)-
lane_numbers = random.sample(range(1, 10), k = n_lanes)
for lane_number in lane_numbers:
path = pathlib.Path(output.lane_dir) / f"L00{lane_number}.fastq"
path.touch()
rule map_fastq:
input:
fastq = "temp/split_fastq/{sample}/L00{lane_number}.fastq"
output:
bam = "temp/map_fastq/{sample}/L00{lane_number}.bam"
run:
bam = pathlib.Path(output.bam)
bam.parent.mkdir(exist_ok=True)
bam.touch()
split_fastq
中创建的通道,以便我们可以对它们进行动态expand
。我们在中间处理步骤链中的最后一条规则(在本例中为expand
)上执行map_fastq
,以便我们请求正确的输入。def get_bams(wildcards):
lane_dir = checkpoints.split_fastq.get(**wildcards).output[0]
lane_numbers = glob_wildcards(f"{lane_dir}/L00{{lane_number}}.fastq").lane_number
bams = expand(rules.map_fastq.output.bam, **wildcards, lane_number=lane_numbers)
return bams
rule merge_bam: input: get_bams output: bam = "temp/merge_bam/{sample}.bam" shell: "cat {input} > {output.bam}"
此示例运行,并且使用
random.seed(1)
恰好创建三个通道(l001
,l002
和l005
)。
[如果您不想使用checkpoint
,我想您可以通过为merge_bam
创建一个输入函数来打开打开原始输入fastq,扫描读取的名称以获得车道信息,并预测输入文件应该是。但是,这似乎不够健壮。