1288Meow
1288Meow

Reputation: 339

Nextflow DSL2 output from different processes mixed up as input in later processes

I have a DSL2 Nextflow pipeline that branches out to 2 FILTER processes. Then in the CONCAT process, I reuse the two previous process outputs as input. Also in the SUMMARIZE process, I reuse previous process ouputs as input.

I am finding that when I run the pipeline with 2 or more pairs of fastq samples, that the inputs are mixed up.

For example, at the CONCAT step, I end up concating the bwa_2_ch output of one pair of fastq samples with the filter_1_ch of another pair of fastq samples instead of samples with the same pair_id.

I believe am not writing the workflow { } channels and inputs entirely correctly the workflow runs through the steps properly without mixing samples. But I am not sure how to define the inputs so that there is no mix up.

//trimmomatic read trimming
process TRIM {

    tag "trim ${pair_id}"   

    publishDir "${params.outdir}/$pair_id/trim_results"

    input:
    tuple val(pair_id), path(reads) 

    output:
    tuple val(pair_id), path("trimmed_${pair_id}_...")

    script:
    """
    """
}


//bwa alignment
process BWA_1 {

    tag "align-1 ${pair_id}f"

    publishDir "${params.outdir}/$pair_id/..."

    input:
    tuple val(pair_id), path(reads)
    path index

    output:
    tuple val(pair_id), path("${pair_id}_...}")

    script:
    """
    """
}

process FILTER_1 {

    tag "filter ${pair_id}"

    publishDir "${params.outdir}/$pair_id/filter_results"

    input:
    tuple val(pair_id), path(reads)

    output:
    tuple val(pair_id), 
    path("${pair_id}_...")

    script:
    """
    """
}

process FILTER_2 {

    tag "filter ${pair_id}"

    publishDir "${params.outdir}/$pair_id/filter_results"

    input:
    tuple val(pair_id), path(reads)

    output:
    tuple val(pair_id), 
    path("${pair_id}_...")

    script:
    """
    """
}

//bwa alignment
process BWA_2 {

    tag "align-2 ${pair_id}"

    publishDir "${params.outdir}/$pair_id/bwa_2_results"

    input:
    tuple val(pair_id), path(reads)
    path index

    output:
    tuple val(pair_id), path("${pair_id}_...}")

    script:
    """
    """
}


//concatenate pf and non_human reads
process CONCAT{

    tag "concat ${pair_id}"

    publishDir "${params.outdir}/$pair_id"

    input:
    tuple val(pair_id), path(program_reads)
    tuple val(pair_id), path(pf_reads)

    output:
    tuple val(pair_id), path("${pair_id}_...")

    script:
    """
    """
}

//summary
process SUMMARY{

    tag "summary ${pair_id}"

    publishDir "${params.outdir}/$pair_id"

    input:
    tuple val(pair_id), path(trim_reads)
    tuple val(pair_id), path(non_human_reads)

    output:
    file("summary_${pair_id}.csv")

    script:
    """
    """
}

workflow {
    Channel
        .fromFilePairs(params.reads, checkIfExists: true)
        .set {read_pairs_ch}

    // trim reads
    trim_ch = TRIM(read_pairs_ch)

    // map to pf genome
    bwa_1_ch = BWA_1(trim_ch, params.pf_index)

    // filter mapped reads
    filter_1_ch = FILTER_1(bwa_1_ch)
    filter_2_ch = FILTER_2(bwa_1_ch)

    // map to pf and human genome
    bwa_2_ch = BWA_2(filter_2_ch, params.index)

    // concatenate non human reads
    concat_ch = CONCAT(bwa_2_ch,filter_1_ch)

    // summarize
    summary_ch = SUMMARY(trim_ch,concat_ch)
}

Upvotes: 1

Views: 756

Answers (1)

Steve
Steve

Reputation: 54502

Mix-ups like this usually occur when a process erroneously receives two or more queue channels. Most of the time, what you want is one queue channel and one or more value channels when you require multiple input channels. Here, I'm not sure exactly what pair_id would be bound to, but it likely won't be what you expect:

input:
tuple val(pair_id), path(program_reads)
tuple val(pair_id), path(pf_reads)

What you want to do is replace the above with:

input:
tuple val(pair_id), path(program_reads), path(pf_reads)

And then use the join operator to create the required inputs. For example:

workflow {

    Channel
        .fromFilePairs( params.reads, checkIfExists: true )
        .set { read_pairs_ch }

    pf_index = file( params.pf_index )
    bwa_index = file( params.bwa_index )

    // trim reads
    trim_ch = TRIM( read_pairs_ch )

    // map to pf genome
    bwa_1_ch = BWA_1( trim_ch, pf_index)

    // filter mapped reads
    filter_1_ch = FILTER_1(bwa_1_ch)
    filter_2_ch = FILTER_2(bwa_1_ch)

    // map to pf and human genome
    bwa_2_ch = BWA_2(filter_2_ch, bwa_index)

    // concatenate non human reads
    concat_ch = bwa_2_ch \
        | join( filter_1_ch ) \
        | CONCAT

    // summarize
    summary_ch = trim_ch \
        | join( concat_ch ) \
        | SUMMARY
}

Upvotes: 1

Related Questions