Reputation: 157
I am struggling with an issue that, probably, has a very basic solution. In my (dsl2) nextflow workflow, I have a number of processes that output a tuple of a file and a value, which indicates the group of the input element. Then, I have a last process that, for each group, collects all the files generated and works on them. For example:
workflow {
generatePDF(input_channel)
generateCSV(input_channel)
generateTXT(input_channel)
mergeprocess( /*grouped input files from previous 3 processes */ )
}
And, as I mentioned the output for each generate* process is
tuple file(output_${samplename}.${extension}) val(${group})
For example, if I have the samples 1, 2 and 3 belonging to group A, 4 to group B and 5 and 6 to group C, I would like to pass as an input for the last process
output_sample1.pdf output_sample2.pdf output_sample3.pdf output_sample1.csv output_sample2.csv output_sample3.csv output_sample1.txt output_sample2.txt output_sample3.txt
output_sample4.pdf output_sample4.csv output_sample4.txt
output_sample5.pdf output_sample6.pdf output_sample5.csv output_sample6.csv output_sample5.txt output_sample6.txt
I have tested a combination of collect(), groupTuple and even join(), but nothing gave me the channel I need.
Thanks for your time.
Upvotes: 2
Views: 1205
Reputation: 54502
One solution, which is perhaps the 'basic' solution you may have been working towards, is to mix your process outputs and then call the groupTuple operator, specifying the index of the element to be used as the grouping key using the by
parameter:
generatePDF.out
| mix( generateCSV.out, generateTXT.out )
| groupTuple( by: 1 )
| view()
However, this solution will 'wait' for all generate processes (generatePDF, generateCSV, generateTXT) to be completed before proceeding to merge any of the grouped outputs. This is because groupTuple ideally needs to know the number of items the grouped lists should contain:
You should always specify the number of expected elements in each tuple using the
size
attribute to allow thegroupTuple
operator to stream the collected values as soon as possible.
If your generate processes all determine the same group for a given sample (which they probably do), you should be able to decouple this logic in such a way that allows you to predetermine the size of each group. You could then create a special grouping key (i.e. using groupKey function) and use that to group the process outputs. The following example simply predetermines the group from parent directory, but hopefully should be applicable to your use case:
First, lets create some test data:
mkdir -p input_files/{A,B,C}
touch input_files/A/sample{1,2,3}.ext
touch input_files/B/sample4.ext
touch input_files/C/sample{5,6}.ext
Then, put the following in a file called script.nf
:
nextflow.enable.dsl=2
process generatePDF {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.pdf")
"""
touch "output_${sample_name}.pdf"
"""
}
process generateCSV {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.csv")
"""
touch "output_${sample_name}.csv"
"""
}
process generateTXT {
input:
tuple val(sample_name), path(input_file)
output:
tuple val(sample_name), path("output_${sample_name}.txt")
"""
touch "output_${sample_name}.txt"
"""
}
process merge_process {
tag { group }
echo true
input:
tuple val(group), path(input_files)
"""
echo "group: ${group}"
ls ${input_files}
"""
}
workflow {
Channel.fromPath( './input_files/*/*.ext' )
| map { infile -> tuple( infile.parent.name, infile ) }
| groupTuple()
| map { group, files -> tuple( groupKey(group, files.size()), files) }
| transpose()
| set { input_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, infile ) }
| ( generatePDF & generateCSV & generateTXT )
| mix
| groupTuple( size: 3 )
| set { outputs_ch }
input_ch
| map { key, infile -> tuple( infile.baseName, key ) }
| join( outputs_ch )
| map { sample, key, files -> tuple( key, files ) }
| groupTuple()
| map { key, grp_files -> tuple( key.toString(), grp_files.flatten() ) }
| merge_process
}
Results:
$ nextflow run script.nf
N E X T F L O W ~ version 20.10.0
Launching `script.nf` [irreverent_lavoisier] - revision: aba248d32e
executor > local (21)
[5d/5cd70d] process > generatePDF (1) [100%] 6 of 6 ✔
[4c/c58256] process > generateCSV (2) [100%] 6 of 6 ✔
[d2/93402c] process > generateTXT (5) [100%] 6 of 6 ✔
[9b/04ea07] process > merge_process (C) [100%] 3 of 3 ✔
group: B
output_sample4.csv
output_sample4.pdf
output_sample4.txt
group: A
output_sample1.csv
output_sample1.pdf
output_sample1.txt
output_sample2.csv
output_sample2.pdf
output_sample2.txt
output_sample3.csv
output_sample3.pdf
output_sample3.txt
group: C
output_sample5.csv
output_sample5.pdf
output_sample5.txt
output_sample6.csv
output_sample6.pdf
output_sample6.txt
Upvotes: 3