Reputation: 977
I'm currently writing a pipeline that looks like this (code for the minimal example is below, the input files are just blank files which names are in the SAMPLES list in the example).
What I would like, is, if a sample fails in one of the first two steps (minimal example is set to make sample1
fail on rule two
), keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something
and split_final
only on sample2
and sample3
here).
I'm already using the --keep-going
option to go on with independant jobs but I have trouble defining the input for the common rule and make it ignore the files that were in a failing path.
SAMPLES = ["sample1", "sample2", "sample3"]
rule all:
input:
expand("{sample}_final", sample=SAMPLES)
rule one:
input:
"{sample}"
output:
"{sample}_ruleOne"
shell:
"touch {output}"
rule two:
input:
rules.one.output
output:
"{sample}_ruleTwo"
run:
if input[0] != 'sample1_ruleOne':
with open(output[0], 'w') as fh:
fh.write(f'written {output[0]}')
rule gather_and_do_something:
input:
expand(rules.two.output, sample=SAMPLES)
output:
'merged'
run:
words = []
for f in input:
with open(f, 'r') as fh:
words.append(next(fh))
if len(input):
with open(output[0], 'w') as fh:
fh.write('\n'.join(words))
rule split_final:
input:
rules.gather_and_do_something.output
output:
'{sample}_final'
shell:
'touch {output}'
I tried writing some custom function to use as an input but that does not seems to work...
def get_files(wildcards):
import os
return [f for f in expand(rules.two.output, sample=SAMPLES) if f in os.listdir(os.getcwd())]
rule gather_and_do_something:
input:
unpack(get_files)
output:
'merged'
run:
words = []
for f in input:
with open(f, 'r') as fh:
words.append(next(fh))
if len(input):
with open(output[0], 'w') as fh:
fh.write('\n'.join(words))
Upvotes: 3
Views: 501
Reputation: 421
By default, snakemake's DAG is static, so it will expect all of the input files requested by the all rule (and their dependencies) to be present. You can get around this requirement by defining checkpoints and corresponding input functions, which can alter the DAG dynamically based on the output of the checkpoint rule(s).
Here's an updated example that turns rule two
into a checkpoint and uses input functions during the aggregation steps (gather_and_do_something
and all
) to skip samples that fail rule two
. The updated example defines an empty output file as failure (os.stat(fn).st_size > 0
), but you could use a different test instead.
import shlex
import os
SAMPLES = ["sample1", "sample2", "sample3"]
def all_input(wldc):
files = []
for sample in SAMPLES:
fn = checkpoints.two.get(sample=sample).output[0]
if os.stat(fn).st_size > 0:
files.append('{sample}_final'.format(sample=sample))
return files
rule all:
input:
all_input
rule one:
input:
"{sample}"
output:
"{sample}_ruleOne"
shell:
"touch {output}"
checkpoint two:
input:
rules.one.output
output:
"{sample}_ruleTwo"
run:
with open(output[0], 'w') as fh:
if input[0] != 'sample1_ruleOne':
fh.write(f'written {output[0]}')
def gather_input(wldc):
files = []
for sample in SAMPLES:
fn = checkpoints.two.get(sample=sample).output[0]
if os.stat(fn).st_size > 0:
files.append("{sample}_ruleTwo".format(sample=sample))
return files
rule gather_and_do_something:
input:
gather_files = gather_input,
output:
'merged'
params:
gather_files = lambda wldc, input: ' '.join(map(shlex.quote, input.gather_files)) # Handle samples/files with spaces
shell:
"""
cat {params.gather_files} > '{output}'
"""
rule split_final:
input:
rules.gather_and_do_something.output
output:
'{sample}_final'
shell:
'touch {output}'
With these changes, rule gather_and_do_something
will only read rule two
output from passing samples (sample2
and sample3
) and rule split_final
will only run jobs for samples 2 and 3.
Upvotes: 1
Reputation: 9062
if a sample fails in one of the first two steps, keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something and split_final only on sample2 and sample3 here).
In the rule(s) that are allowed to fail I would put a try/except to catch the fail in a controlled way and to write a dummy file for the failing samples. Downstream rules will then recognize the dummy file and behave as if the sample is not there, for example by writing another dummy file.
I don't know if it annoys you to have these dummy files kicking around... In my opinion is not a bad thing because they signal that a sample failed in an acceptable way.
Here some example pseudocode:
rule some_can_fail:
input:
'{sample}.txt',
output:
'{sample}.two',
run:
try:
...
except:
# Write an empty file or a file with some message that indicate it
# failed
touch(output)
rule gather:
input:
expand('{sample}.two', sample= SAMPLES),
output:
'merged.txt',
run:
for fin in input:
# Ignore failed samples
if fin is not empty:
# Read fin and append to output file
rule split_final:
input:
merged= 'merged.txt',
two= '{sample}.two',
output:
'{sample}_final',
run:
if input.two is empty:
# As above, write an empty file or a file with a meanigful message
touch(output)
else:
# Do something and write to output
Upvotes: 0