我想使用snakemake中的检查点来恢复执行前未知的输出文件。我遵循了这里的示例https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#data-dependent-conditional-execution。但是,如果输出在路径中包含多个通配符,则用于恢复检查点输出的函数会抛出错误。
我尝试根据文档中提供的示例创建一个小的简化示例。
我假设我需要调整aggregate_input函数来接受示例通配符。有谁可以给建议吗?
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt"
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}")
shell:
'''
mkdir -p results
mkidr results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
'''
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
return expand("results/{sample}/{i}.txt", sample=wildcards.sample, i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i)
rule aggregate:
input:
aggregate_input
output:
"aggregated.txt"
shell:
"cat {input} > {output}"
这会产生以下错误:
InputFunctionException in rule aggregate in file /gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile, line 26:
Error:
WorkflowError:
Missing wildcard values for sample
Wildcards:
Traceback:
File "/gpfs/nhmfsa/bulk/share/data/mbl/share/workspaces/groups/clarkgroup/oliw/test_snakemake_checkpoint/Snakefile", line 23, in aggregate_input
我认为这里的问题是,当调用
checkpoints.somestep.get(**wildcards)
时,未定义 wildcards.sample
,因为示例通配符未在聚合规则中定义。你可以像这样克服这个问题:
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt",
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}"),
shell:
"""
mkdir -p results
mkdir results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
"""
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
out = []
for sample in SAMPLE:
checkpoint_output = checkpoints.somestep.get(**{"sample": sample}).output[0]
out.extend(
expand(
"results/{sample}/{i}.txt",
sample=sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
)
)
return out
rule aggregate:
input:
aggregate_input,
output:
"aggregated.txt",
shell:
"cat {input} > {output}"
根据我在检查站的经验,我通常可能会做这样的事情:
SAMPLE = ["A", "B", "C", "D", "E"]
# a target rule to define the desired final output
rule all:
input:
"aggregated.txt",
# the checkpoint that shall trigger re-evaluation of the DAG
# an number of file is created in a defined directory
checkpoint somestep:
output:
directory("results/{sample}"),
shell:
"""
mkdir -p results
mkdir results/{wildcards.sample}
cd results/{wildcards.sample}
for i in 1 2 3; do touch $i.txt; done
"""
# input function for rule aggregate, return paths to all files produced by the checkpoint 'somestep'
def aggregate_input(wildcards):
checkpoint_output = checkpoints.somestep.get(**wildcards).output[0]
return expand(
"results/{sample}/{i}.txt",
sample=wildcards.sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.txt")).i,
)
rule aggregate_per_sample:
input:
aggregate_input,
output:
"{sample}_aggregated.txt",
shell:
"cat {input} > {output}"
rule aggregate_samples:
input:
expand("{sample}_aggregated.txt", sample=SAMPLE),
output:
"aggregated.txt",
shell:
"cat {input} > {output}"