hermidalc
hermidalc

Reputation: 568

Snakemake checkpoint output unknown number of files with no subsequent aggregation but instead rules that peform actions on individual files?

Thanks for any help ahead of time.

I'm trying to use the Snakemake checkpoint functionality to produce an unknown number of files in a directory, which I've gotten to work using the pattern described in the docs, but then I don't want to do any kind of aggregation rule afterwards, I want to have rules that do actions on each individual file (of course inherently in parallel via wildcards).

Here's a simple reproducible example of my problem:

from os.path import join


rule all:
    input:
        "aggregated.txt",


checkpoint create_gzip_file:
    output:
        directory("my_directory/"),
    shell:
        """
        mkdir my_directory/
        cd my_directory
        for i in 1 2 3; do gzip < /dev/null > $i.txt.gz; done
        """


rule gunzip_file:
    input:
        join("my_directory", "{i}.txt.gz"),
    output:
        join("my_directory", "{i}.txt"),
    shell:
        """
        gunzip -c {input} > {output}
        """


def gather_gunzip_input(wildcards):
    out_dir = checkpoints.create_gzip_file.get(**wildcards).output[0]
    i = glob_wildcards(join(out_dir, "{i}.txt.gz"))
    return expand(f"{out_dir}/{{i}}", i=i)


rule aggregate:
    input:
        gather_gunzip_input,
    output:
        "aggregated.txt",
    shell:
        "cat {input} > {output}"

I'm getting the following error:

$ snakemake --printshellcmds --cores all
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 16
Rules claiming more threads will be scaled down.
Job stats:
job                 count    min threads    max threads
----------------  -------  -------------  -------------
aggregate               1              1              1
all                     1              1              1
create_gzip_file        1              1              1
total                   3              1              1

Select jobs to execute...

[Wed Jul 13 14:57:09 2022]
checkpoint create_gzip_file:
    output: my_directory
    jobid: 2
    reason: Missing output files: my_directory
    resources: tmpdir=/tmp
Downstream jobs will be updated after completion.


        mkdir my_directory/
        cd my_directory
        for i in 1 2 3; do gzip < /dev/null > $i.txt.gz; done
        
[Wed Jul 13 14:57:09 2022]
Finished job 2.
1 of 3 steps (33%) done
MissingInputException in line 20 of /home/hermidalc/projects/github/hermidalc/test/Snakefile:
Missing input files for rule gunzip_file:
    output: my_directory/['1', '2', '3'].txt
    wildcards: i=['1', '2', '3']
    affected files:
        my_directory/['1', '2', '3'].txt.gz

Upvotes: 0

Views: 431

Answers (1)

hermidalc
hermidalc

Reputation: 568

I had a syntax issue (which wasn't triggering any syntax check or compiler issues) that was causing the seemingly unrelated MissingInputException. The glob_wildcards line:

i = glob_wildcards(join(out_dir, "{i}.txt.gz"))

needs to be with a trailing comma:

i, = glob_wildcards(join(out_dir, "{i}.txt.gz"))

or

i = glob_wildcards(join(out_dir, "{i}.txt.gz")).i

Also, in answering the other part of the question - I believe if you don't want an aggregation-type rule (which uses the function that gathers the unknown number of files as its input) then you need to put that function as input to your rule all. As shown in this question, you can continue to have downstream rules of your checkpoint, which do not aggregate, but perform actions on the individual unknown files, you just have to use the wildcards created in your gather function and write the expand in the right way that it outputs the file structure for how the output of the last rule performing actions on files from the checkpoint come out.

Upvotes: 1

Related Questions