real_name
real_name

Reputation: 21

Nextflow, passing output of one process to the next

I am completely new to Nextflow.

I am trying to obtain a value from process (DO_FIRST) and pass it as an input parameter to the next process (DO_SECOND) in a workflow.

The DO_SECOND process is already a working script where I simply seek to add one input parameter (my only question about this script is as what I should set as its input qualifier)

My minimal working example looks like:

process DO_FIRST {
    
    input:
    path(bam)

    exec:
    println "##### getting scale factor #####"
    
    output:
    // stdout emit: scale // Couldn't access the scale value
    val scale_factor // throws error:  Missing value declared as output parameter: scale
    script:
    """
    samtools view -f 0 -c $bam | awk '{printf "%.6f", 1000000/\$1}'
    """
}
workflow {
    bam_file = file('random.bam')
    SAMTOOLS_SCALE(bam_file) 
    // Print the scale factor
    println "Scale factor for bam file: $scale_factor"

    DO_SECOND (
           bam_file,
            scale_factor // what should the input qualifier be?
       )

}

The command executed in DO_FIRST should give a simple float value. I do not get any errors for the execution of that command (although I am not sure it actually executes). My questions:

  1. How do I output the float value from the command in DO_FIRST so I can access it in the workflow (what output qualifier etc)
  2. How do I pass this value on to the DO_SECOND process (what input qualifier?). Can I just add an extra input as val scale_factor

Upvotes: 1

Views: 37

Answers (1)

Steve
Steve

Reputation: 54562

I think you can continue using the stdout output qualifier here. And you can just use the val qualifier to input the String value downstream. But assuming you will want a scaling factor for each alignment file, you will want to use tuples to output multiple values in a single channel. This makes joining up the channels much (much) easier. For example:

params.bams = '/path/to/files/*.bam'


process COUNT_READS {

    tag "${sample}"

    input:
    tuple val(sample), path(bam)

    output:
    tuple val(sample), stdout

    script:
    """
    samtools view -c "${bam}"
    """
}

process DO_SOMETHING {

    tag "${sample}"

    debug true

    input:
    tuple val(sample), val(scale_factor), path(bam)

    script:
    """
    echo "${sample}, ${scale_factor}"
    """
}
workflow {

    samples_ch = Channel.fromFilePairs( params.bams, size: 1, flat: true )

    COUNT_READS( samples_ch )
        | map { sample, read_count ->
            def scale_factor = sprintf("%.6f", 1000000 / (read_count as Double))

            tuple( sample, scale_factor )
        }
        | join( samples_ch )
        | DO_SOMETHING 
}

Results:

$ nextflow run main.nf

 N E X T F L O W   ~  version 24.10.4

Launching `main.nf` [small_banach] DSL2 - revision: f30dea4f35

executor >  local (6)
[26/f2965b] process > COUNT_READS (foo)  [100%] 3 of 3 ✔
[82/47f739] process > DO_SOMETHING (foo) [100%] 3 of 3 ✔
baz, 1.424347

bar, 1.209417

foo, 0.548415

Upvotes: 0

Related Questions