Plopp
Plopp

Reputation: 977

Snakemake ignore failed path and redefine inputs for a common rule

I'm currently writing a pipeline that looks like this (code for the minimal example is below, the input files are just blank files which names are in the SAMPLES list in the example).

enter image description here

What I would like, is, if a sample fails in one of the first two steps (minimal example is set to make sample1 fail on rule two), keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something and split_final only on sample2 and sample3 here).

I'm already using the --keep-going option to go on with independant jobs but I have trouble defining the input for the common rule and make it ignore the files that were in a failing path.

SAMPLES = ["sample1", "sample2", "sample3"]

rule all:
    input:
        expand("{sample}_final", sample=SAMPLES)

rule one:
    input:
        "{sample}"
    output:
        "{sample}_ruleOne"
    shell:
        "touch {output}"

rule two:
    input:
        rules.one.output
    output:
        "{sample}_ruleTwo"
    run:
        if input[0] != 'sample1_ruleOne':
            with open(output[0], 'w') as fh:
                fh.write(f'written {output[0]}')

rule gather_and_do_something:
    input:
        expand(rules.two.output, sample=SAMPLES)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))

rule split_final:
    input:
        rules.gather_and_do_something.output
    output:
        '{sample}_final'
    shell:
        'touch {output}'

I tried writing some custom function to use as an input but that does not seems to work...

def get_files(wildcards):
    import os
    return [f for f in expand(rules.two.output, sample=SAMPLES) if f in os.listdir(os.getcwd())]

rule gather_and_do_something:
    input:
        unpack(get_files)
    output:
        'merged'
    run:
        words = []
        for f in input:
            with open(f, 'r') as fh:
                words.append(next(fh))
        if len(input):
            with open(output[0], 'w') as fh:
                fh.write('\n'.join(words))

Upvotes: 3

Views: 501

Answers (2)

dofree
dofree

Reputation: 421

By default, snakemake's DAG is static, so it will expect all of the input files requested by the all rule (and their dependencies) to be present. You can get around this requirement by defining checkpoints and corresponding input functions, which can alter the DAG dynamically based on the output of the checkpoint rule(s).

Here's an updated example that turns rule two into a checkpoint and uses input functions during the aggregation steps (gather_and_do_something and all) to skip samples that fail rule two. The updated example defines an empty output file as failure (os.stat(fn).st_size > 0), but you could use a different test instead.

import shlex
import os

SAMPLES = ["sample1", "sample2", "sample3"]

def all_input(wldc):
    files = []
    for sample in SAMPLES:
        fn = checkpoints.two.get(sample=sample).output[0]
        if os.stat(fn).st_size > 0:
            files.append('{sample}_final'.format(sample=sample))
    return files

rule all:
    input:
        all_input

rule one:
    input:
        "{sample}"
    output:
        "{sample}_ruleOne"
    shell:
        "touch {output}"

checkpoint two:
    input:
        rules.one.output
    output:
        "{sample}_ruleTwo"
    run:
        with open(output[0], 'w') as fh:
            if input[0] != 'sample1_ruleOne':
                fh.write(f'written {output[0]}')

def gather_input(wldc):
    files = []
    for sample in SAMPLES:
        fn = checkpoints.two.get(sample=sample).output[0]
        if os.stat(fn).st_size > 0:
            files.append("{sample}_ruleTwo".format(sample=sample))
    return files

rule gather_and_do_something:
    input:
        gather_files = gather_input,
    output:
        'merged'
    params:
        gather_files = lambda wldc, input: ' '.join(map(shlex.quote, input.gather_files)) # Handle samples/files with spaces
    shell:
        """
        cat {params.gather_files} > '{output}'
        """

rule split_final:
    input:
        rules.gather_and_do_something.output
    output:
        '{sample}_final'
    shell:
        'touch {output}'

With these changes, rule gather_and_do_something will only read rule two output from passing samples (sample2 and sample3) and rule split_final will only run jobs for samples 2 and 3.

Upvotes: 1

dariober
dariober

Reputation: 9062

if a sample fails in one of the first two steps, keep going with all the next steps just like it not being there (meaning it would do the rule gather_and_do_something and split_final only on sample2 and sample3 here).

In the rule(s) that are allowed to fail I would put a try/except to catch the fail in a controlled way and to write a dummy file for the failing samples. Downstream rules will then recognize the dummy file and behave as if the sample is not there, for example by writing another dummy file.

I don't know if it annoys you to have these dummy files kicking around... In my opinion is not a bad thing because they signal that a sample failed in an acceptable way.

Here some example pseudocode:

rule some_can_fail:
    input:
        '{sample}.txt',
    output:
        '{sample}.two',
    run:
        try:
            ...
        except:
            # Write an empty file or a file with some message that indicate it
            # failed
            touch(output)


rule gather:
    input:
        expand('{sample}.two', sample= SAMPLES),
    output:
        'merged.txt',
    run:
        for fin in input:
            # Ignore failed samples
            if fin is not empty:
                # Read fin and append to output file


rule split_final:
    input:
        merged= 'merged.txt',
        two= '{sample}.two',
    output:
        '{sample}_final',
    run:
        if input.two is empty:
            # As above, write an empty file or a file with a meanigful message
            touch(output)
        else:
            # Do something and write to output

Upvotes: 0

Related Questions