Reputation: 3135
I have a workflow consisting of 2 subworkflows.
params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"
workflow subworkflow_A {
take:
reads // channel: [ val(sample), [ reads ] ]
main:
count_reads(reads)
trim_reads(reads)
emit:
trimmed_reads = process2.out.reads // channel: [ val(sample), [ trimmed_reads ] ]
}
workflow subworkflow_B {
take:
reads // channel: [ val(sample), [ reads ] ]
db // channel: /path/to/kaiju/db.fmi
nodes // channel: /path/to/kaiju/nodes/file
names // channel: /path/to/kaiju/names/file
main:
taxonomic_classification(reads, nodes, db)
kaiju_to_krona(taxonomic_classification.out, nodes, names)
krona_import_text(kaiju_to_krona.out)
kaiju_to_table(taxonomic_classification.out, nodes, names)
}
workflow main {
ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
subworkflow_A(ch_reads)
ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
ch_trimmed_reads = subworkflow_A.out.trimmed_reads
subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}
The input for params.reads
is a directory like,
reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz
The input for subworkflow_A
, ch_reads
is:
[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]
subworkflow_A
then emits the following channel into ch_trimmed_reads
[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]
For some reason, subworkflow_B
only runs the first sample test_sample1
, and not the second sample test_sample1
when I want to run it over both samples.
Upvotes: 1
Views: 313
Reputation: 54502
Note that a value channel is implicitly created by a process when it is invoked with a simple value. This means you can just pass in a plain file
object. For example:
workflow main {
ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )
db = file( params.kaiju_db )
nodes = file( params.kaiju_nodes )
names = file( params.kaiju_names )
subworkflow_B( ch_reads, db, nodes, names )
}
Most of the time, what you want is one queue channel and one or more value channels when your process requires multiple input channels:
When two or more channels are declared as process inputs, the process waits until there is a complete input configuration, i.e. until it receives a value from each input channel. When this condition is satisfied, the process consumes a value from each channel and launches a new task, repeating this logic until one or more channels are empty.
As a result, channel values are consumed sequentially and any empty channel will cause the process to wait, even if the other channels have values.
A different semantic is applied when using a value channel. This kind of channel is created by the Channel.value factory method or implicitly when a process is invoked with an argument that is not a channel. By definition, a value channel is bound to a single value and it can be read an unlimited number of times without consuming its content. Therefore, when mixing a value channel with one or more (queue) channels, it does not affect the process termination because the underlying value is applied repeatedly.
Upvotes: 1