我计划创建一个在大规模数据集上运行的 Snakemake 脚本。该脚本将:
但我不知道如何实现这个。
这是我的脚本的基本结构:
sample = ['A', 'B', 'C']
rule all:
input:
expand('output/pre_process/{sample}.txt', sample=sample)
# I am not sure how to add the input
# just a toy run
rule pre_process:
output:
'output/pre_process/{sample}.txt'
shell:
"""
echo "" > {output}
"""
rule filter:
input:
expand('output/pre_process/{sample}.txt', sample=sample)
output:
# all passed filter sample will in folder, one sample one file
directory('output/filter')
shell:
"""
# toy run
cp output/pre_process/{{A,B}}.txt output/filter/
"""
rule process:
input:
# I need process samples in output/filter one by one
output:
'output/data/{sample}.txt'
shell:
"""
# just example, not run
echo "" > {output}
"""
注意:除非
filter
步骤输出文件,否则无法获得未来处理所需的样本
我认为你需要的是一个检查点,它在过滤步骤之后重新评估 dag。
这行得通吗:
def get_file_names(wildcards):
ck_output = checkpoints.filter.get(**wildcards).output[0]
SMP, = glob_wildcards(os.path.join(ck_output, "{sample}.txt"))
return expand(os.path.join(ck_output, "{SAMPLE}.txt"), SAMPLE=SMP)
def getFinalOut(wildcards):
ck_output = checkpoints.filter.get(**wildcards).output[0]
SMP, = glob_wildcards(os.path.join(ck_output, "{sample}.txt"))
return expand(os.path.join("output/data/", "{SAMPLE}.txt"), SAMPLE=SMP)
rule all:
input:
expand('output/pre_process/{sample}.txt', sample=['A', 'B', 'C']),
getFinalOut
# just a toy run
rule pre_process:
output:
'output/pre_process/{sample}.txt'
shell:
"""
echo "" > {output}
"""
checkpoint filter:
input:
expand('output/pre_process/{sample}.txt', sample=['A', 'B', 'C'])
output:
directory('output/filter')
shell:
"""
# toy run
mkdir {output}
cp output/pre_process/{{A,B}}.txt {output}
"""
rule process:
input:
get_file_names
output:
'output/data/{sample}.txt'
shell:
"""
# just example, not run
echo "" > {output}
"""
免责声明:我自己从未使用过检查点,只是基于这篇文章here。所以最好仔细检查两次。