vkkodali
vkkodali

Reputation: 680

snakemake - how to make a list of input files based on a previous rule that produces variable number of files

Say, I am starting with a bunch of files like these:

group_1_in.txt, group_2_in.txt, group_3_in.txt

I process them using a rule that generates the directory structure shown below.

rule process_group_files:
    input: 'group_{num}_in.txt'
    output: directory('group_{num}')
    shell: "some_command {input} {output}'

## directory structure produced: 
group_1
    sample1_content.txt
    sample2_content.txt
    sample3_content.txt
group_2
    sample2_content.txt
    sample3_content.txt
    sample4_content.txt
group_3
    sample1_content.txt
    sample2_content.txt
    sample5_content.txt 

Then, I have rule that processes them to aggregate files by sample:

rule aggregate_by_sample:
    input: expand('{group}/{sample}_content.txt')
    output: '{sample}_allcontent.txt'
    shell: "cat {input} | some_command > {output}"

I expect the inputs for this rule to be:

group_1/sample1_content.txt, group_3/sample1_content.txt
group_1/sample2_content.txt, group_2/sample2_content.txt, group_3/sample2_content.txt
group_1/sample3_content.txt, group_2/sample3_content.txt
group_2/sample4_content.txt 
group_3/sample5_content.txt

and produce the following output files:

sample1_allcontent.txt
sample2_allcontent.txt
sample3_allcontent.txt
sample4_allcontent.txt
sample5_allcontent.txt

At this point, I want to work with these output files. So, the rule for this can be something like:

rule process_by_sample:
    input: <list of all sample_allcontent files>
    output: final_output.txt 
    shell: "cat {input} | some_other_command > {output}"

My question is this: how can I tell snakemake to wait until it has finished processing all of the files in aggregate_by_sample rule, then use that set of output files for the rule process_by_sample? I explored the idea of checkpoints by making aggregate_by_sample a checkpoint but I should be using a 'directory' as output since I don't know apriori how many output files will be produced. But I cannot do that because my output file names use wildcards and snakemake complains that Wildcards in input files cannot be determined from output files.

EDIT -- After seeing the answer by @troy-comi I realized that I oversimplified my issue. I updated my question to include the first rule process_group_files. All I know at the beginning of the pipeline is how many groups I have and what the 'number' wildcard list is.

Upvotes: 3

Views: 4161

Answers (1)

Troy Comi
Troy Comi

Reputation: 2079

Since the files already exist, you can use glob_wildcards to get a listing of the group/samples on the file system. Using that you can build up your input files with some more processing.

Here's my (untested) idea:

wc =  glob_wildcards('{group}/{sample}_content.txt')
samples_to_group = {}
for samp, group in zip(wc.group, wc.sample):
    if samp not in samples_to_group:
        samples_to_group[samp] = []
    samples_to_group.append(group)

# now samples_to_group is a map of which groups are present for each sample

rule all:
    input: "final_output.txt"

rule aggregate_by_sample:
    input: expand('{group}/{sample}_content.txt', 
                  group=samples_to_group[wildcards.sample],
                  allow_missing=True)
    output: '{sample}_allcontent.txt'
    shell: "cat {input} | some_command > {output}"

rule process_by_sample:
    input: expand('{sample}_allcontent.txt', sample=samples_to_group.keys())
    output: final_output.txt 
    shell: "cat {input} | some_other_command > {output}"

If another rule is producing the files you have to use checkpoints.

-- EDIT to answer refined question --

I can only get this to work if you know the samples beforehand, not necessary the group-sample mapping, just that you have 5 samples total...

Setting up a directory with the following files:

$ tail data/group_*.txt
==> data/group_1.txt <==
1
2
3

==> data/group_2.txt <==
2
3
4

==> data/group_3.txt <==
1
2
5

Then a Snakefile with

wildcard_constraints:
    num="\d+"

groups = glob_wildcards('data/group_{num}.txt').num
samples = range(1, 6)

rule all:
    input: "final_output.txt"

checkpoint process_group_files:
    input: 'data/group_{num}.txt'
    output: directory('data/group_{num}')
    shell:
        'mkdir {output} \n'
        'for line in $(cat {input}) ; do echo "$line {input}" '
            '> {output}/${{line}}_content.txt ; '
        'done \n'
        'sleep 1'

def aggregate_input(wildcards):
    for num in groups:
        checkpoints.process_group_files.get(num=num).output

    grps = glob_wildcards(f'data/group_{{group}}/{wildcards.sample}_content.txt').group
    return expand('data/group_{group}/{sample}_content.txt',
            group=grps,
            sample=wildcards.sample)


rule aggregate_by_sample:
    input: aggregate_input
    output: 'data/agg/{sample}_allcontent.txt'
    shell: 'cat {input} > {output}'

rule process_by_sample:
    input: expand('data/agg/{sample}_allcontent.txt', sample=samples)
    output: 'final_output.txt'
    shell: 'cat {input} > {output}'

will give a final output of:

$ cat final_output.txt
1 data/group_1.txt
1 data/group_3.txt
2 data/group_1.txt
2 data/group_2.txt
2 data/group_3.txt
3 data/group_1.txt
3 data/group_2.txt
4 data/group_2.txt
5 data/group_3.txt

The 'magic' is calling checkpoints with a for loop, which is the locking you need. Again, it requires knowing the samples before. You can try a second layer of checkpoints, but that usually fails. I also remember someone else having trouble with checkpoints in a for loop so it may break in a non-toy example. BTW this is snakemake 5.10

It may end up being easier splitting into two workflows honestly (snakemake -s Snakefile1 && snakemake -s Snakefile2)!

Good luck!

Upvotes: 1

Related Questions