spitfiredd
spitfiredd

Reputation: 3135

Nexflow only processes one of my paired sample in a subworkflow

I have a workflow consisting of 2 subworkflows.

params.reads = "$projectDir/data/raw/reads/*_{1,2}.fastq.gz"
params.kaiju_db = "$projectDir/data/kaijudb/viruses/kaiju_db_viruses.fmi"
params.kaiju_names = "$projectDir/data/kaijudb/viruses/names.dmp"
params.kaiju_nodes = "$projectDir/data/kaijudb/viruses/nodes.dmp"

workflow subworkflow_A {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]

  main:
    count_reads(reads)
    trim_reads(reads)

  emit:
    trimmed_reads = process2.out.reads      // channel: [ val(sample), [ trimmed_reads ] ]
}

workflow subworkflow_B {

  take:
    reads                                      // channel: [ val(sample), [ reads ] ]
    db            // channel: /path/to/kaiju/db.fmi
    nodes         // channel: /path/to/kaiju/nodes/file
    names         // channel: /path/to/kaiju/names/file

  main:
    taxonomic_classification(reads, nodes, db)
    kaiju_to_krona(taxonomic_classification.out, nodes, names)
    krona_import_text(kaiju_to_krona.out)
    kaiju_to_table(taxonomic_classification.out, nodes, names)
}

workflow main {
    ch_reads = Channel.fromFilePairs("$params.reads", checkIfExists:true)
    subworkflow_A(ch_reads)
    
    ch_db = Channel.fromPath("$params.kaiju_db", checkIfExists: true)
    ch_nodes = Channel.fromPath("$params.kaiju_nodes", checkIfExists: true)
    ch_names = Channel.fromPath("$params.kaiju_names", checkIfExists: true)
    ch_trimmed_reads = subworkflow_A.out.trimmed_reads
    subworkflow_B(ch_processed_reads, ch_db, ch_nodes, ch_names)
}

The input for params.reads is a directory like,

reads/
├── test_sample1_1.fastq.gz
├── test_sample1_2.fastq.gz
├── test_sample2_1.fastq.gz
└── test_sample2_2.fastq.gz

The input for subworkflow_A, ch_reads is:

[test_sample1, [~project/data/raw/reads/test_sample1_1.fastq.gz, ~project/data/raw/reads/test_sample1_2.fastq.gz]]
[test_sample2, [~project/data/raw/reads/test_sample2_1.fastq.gz, ~project/data/raw/reads/test_sample2_2.fastq.gz]]

subworkflow_A then emits the following channel into ch_trimmed_reads

[test_sample1, [~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R1.fq.gz, ~project/work/51/240e81f0a30e7e4c1d932abfe97502/test_sample1.trim.R2.fq.gz]]
[test_sample2, [~project/work/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R1.fq.gz, ~project/work/b2/d38399833f3adf11d4e8c6d85ec293/test_sample2.trim.R2.fq.gz]]

For some reason, subworkflow_B only runs the first sample test_sample1, and not the second sample test_sample1 when I want to run it over both samples.

Upvotes: 1

Views: 313

Answers (1)

Steve
Steve

Reputation: 54502

Note that a value channel is implicitly created by a process when it is invoked with a simple value. This means you can just pass in a plain file object. For example:

workflow main {
    ch_reads = Channel.fromFilePairs( params.reads, checkIfExists:true )

    db = file( params.kaiju_db )
    nodes = file( params.kaiju_nodes )
    names = file( params.kaiju_names )

    subworkflow_B( ch_reads, db, nodes, names )
}

Most of the time, what you want is one queue channel and one or more value channels when your process requires multiple input channels:

When two or more channels are declared as process inputs, the process waits until there is a complete input configuration, i.e. until it receives a value from each input channel. When this condition is satisfied, the process consumes a value from each channel and launches a new task, repeating this logic until one or more channels are empty.

As a result, channel values are consumed sequentially and any empty channel will cause the process to wait, even if the other channels have values.

A different semantic is applied when using a value channel. This kind of channel is created by the Channel.value factory method or implicitly when a process is invoked with an argument that is not a channel. By definition, a value channel is bound to a single value and it can be read an unlimited number of times without consuming its content. Therefore, when mixing a value channel with one or more (queue) channels, it does not affect the process termination because the underlying value is applied repeatedly.

Upvotes: 1

Related Questions