Reputation: 680
Say, I am starting with a bunch of files like these:
group_1_in.txt, group_2_in.txt, group_3_in.txt
I process them using a rule that generates the directory structure shown below.
rule process_group_files:
input: 'group_{num}_in.txt'
output: directory('group_{num}')
shell: "some_command {input} {output}'
## directory structure produced:
group_1
sample1_content.txt
sample2_content.txt
sample3_content.txt
group_2
sample2_content.txt
sample3_content.txt
sample4_content.txt
group_3
sample1_content.txt
sample2_content.txt
sample5_content.txt
Then, I have rule that processes them to aggregate files by sample:
rule aggregate_by_sample:
input: expand('{group}/{sample}_content.txt')
output: '{sample}_allcontent.txt'
shell: "cat {input} | some_command > {output}"
I expect the inputs for this rule to be:
group_1/sample1_content.txt, group_3/sample1_content.txt
group_1/sample2_content.txt, group_2/sample2_content.txt, group_3/sample2_content.txt
group_1/sample3_content.txt, group_2/sample3_content.txt
group_2/sample4_content.txt
group_3/sample5_content.txt
and produce the following output files:
sample1_allcontent.txt
sample2_allcontent.txt
sample3_allcontent.txt
sample4_allcontent.txt
sample5_allcontent.txt
At this point, I want to work with these output files. So, the rule for this can be something like:
rule process_by_sample:
input: <list of all sample_allcontent files>
output: final_output.txt
shell: "cat {input} | some_other_command > {output}"
My question is this: how can I tell snakemake to wait until it has finished processing all of the files in aggregate_by_sample
rule, then use that set of output files for the rule process_by_sample
? I explored the idea of checkpoints by making aggregate_by_sample
a checkpoint but I should be using a 'directory' as output since I don't know apriori how many output files will be produced. But I cannot do that because my output file names use wildcards and snakemake complains that Wildcards in input files cannot be determined from output files
.
EDIT -- After seeing the answer by @troy-comi I realized that I oversimplified my issue. I updated my question to include the first rule process_group_files
. All I know at the beginning of the pipeline is how many groups I have and what the 'number' wildcard list is.
Upvotes: 3
Views: 4161
Reputation: 2079
Since the files already exist, you can use glob_wildcards to get a listing of the group/samples on the file system. Using that you can build up your input files with some more processing.
Here's my (untested) idea:
wc = glob_wildcards('{group}/{sample}_content.txt')
samples_to_group = {}
for samp, group in zip(wc.group, wc.sample):
if samp not in samples_to_group:
samples_to_group[samp] = []
samples_to_group.append(group)
# now samples_to_group is a map of which groups are present for each sample
rule all:
input: "final_output.txt"
rule aggregate_by_sample:
input: expand('{group}/{sample}_content.txt',
group=samples_to_group[wildcards.sample],
allow_missing=True)
output: '{sample}_allcontent.txt'
shell: "cat {input} | some_command > {output}"
rule process_by_sample:
input: expand('{sample}_allcontent.txt', sample=samples_to_group.keys())
output: final_output.txt
shell: "cat {input} | some_other_command > {output}"
If another rule is producing the files you have to use checkpoints.
-- EDIT to answer refined question --
I can only get this to work if you know the samples beforehand, not necessary the group-sample mapping, just that you have 5 samples total...
Setting up a directory with the following files:
$ tail data/group_*.txt
==> data/group_1.txt <==
1
2
3
==> data/group_2.txt <==
2
3
4
==> data/group_3.txt <==
1
2
5
Then a Snakefile with
wildcard_constraints:
num="\d+"
groups = glob_wildcards('data/group_{num}.txt').num
samples = range(1, 6)
rule all:
input: "final_output.txt"
checkpoint process_group_files:
input: 'data/group_{num}.txt'
output: directory('data/group_{num}')
shell:
'mkdir {output} \n'
'for line in $(cat {input}) ; do echo "$line {input}" '
'> {output}/${{line}}_content.txt ; '
'done \n'
'sleep 1'
def aggregate_input(wildcards):
for num in groups:
checkpoints.process_group_files.get(num=num).output
grps = glob_wildcards(f'data/group_{{group}}/{wildcards.sample}_content.txt').group
return expand('data/group_{group}/{sample}_content.txt',
group=grps,
sample=wildcards.sample)
rule aggregate_by_sample:
input: aggregate_input
output: 'data/agg/{sample}_allcontent.txt'
shell: 'cat {input} > {output}'
rule process_by_sample:
input: expand('data/agg/{sample}_allcontent.txt', sample=samples)
output: 'final_output.txt'
shell: 'cat {input} > {output}'
will give a final output of:
$ cat final_output.txt
1 data/group_1.txt
1 data/group_3.txt
2 data/group_1.txt
2 data/group_2.txt
2 data/group_3.txt
3 data/group_1.txt
3 data/group_2.txt
4 data/group_2.txt
5 data/group_3.txt
The 'magic' is calling checkpoints with a for loop, which is the locking you need. Again, it requires knowing the samples before. You can try a second layer of checkpoints, but that usually fails. I also remember someone else having trouble with checkpoints in a for loop so it may break in a non-toy example. BTW this is snakemake 5.10
It may end up being easier splitting into two workflows honestly (snakemake -s Snakefile1 && snakemake -s Snakefile2
)!
Good luck!
Upvotes: 1