ul.Duc
ul.Duc

Reputation: 41

nextflow's process don't chain, stops after first success

I'm working on a colleague's pipeline Nextflow 19.04 and i have a weird behavior.

in every case i used it, it was working like intended but recently we changed the technology of input data without modifying anything of the format or anything else. The first process called "init" runs, succeed and nextflow doesn't chain with the next one

process init {
    output:
        stdout into init_ch
    script:
    """
    head -n 1 ${params.config} | awk '{print \$1}' |  tr -d "\n"
    """
}

bampbi_ch=bamtuple_ch.join(pbituple_ch).combine(init_ch)

process rename {
    publishDir path: "${params.publishdirResults}/Correction_Stats", mode: 'copy' , pattern : '*.stats'
    input:
        set ID, file(bam), file(pbi), val(espece) from bampbi_ch
    output:
        set val("${name}"), file("${name}.bam"), file ("${name}.bam.pbi") into bampbi2_ch, bampbi3_ch, bampbi4_ch, bampbi_forbf_ch
        file "*.stats" into correctionstats_ch
    script:
        name=espece+"_"+params.target
        """
        cp $bam ${name}.bam
        cp $pbi ${name}.bam.pbi
        size=`ls -sh | grep "$name" | grep ".bam\$" | awk '{print \$1}'`
        echo -e "${name}.bam\t\${size}" >> bamSize.stats
        """
}

the .nextflow.log speaks about other process but it dont quite understand what it means

Mar-29 13:34:28.749 [main] DEBUG nextflow.cli.Launcher - $> nextflow -c CATCH.confi run CATCH.nf --bamData '/somepath*.bam' --bamDatapbi '/somepath*.bam.pbi' --barcodes /somepath/barcodes.fasta --minScoreDemux 80 --config /somepath/config.txt --target blabal --primers primers.fasta --canuSpec /somepath/Canu.spec --tailleCanuDeNovo 3G --seqInterne /somepath/merge_2_DSI_regions.fasta --publishdirResults /somepath/Results
Mar-29 13:34:28.971 [main] INFO  nextflow.cli.CmdRun - N E X T F L O W  ~  version 19.04.0
Mar-29 13:34:28.997 [main] INFO  nextflow.cli.CmdRun - Launching `CATCH.nf` [berserk_sax] - revision: c4df35f250
Mar-29 13:34:29.045 [main] DEBUG nextflow.config.ConfigBuilder - User config file: /somepath/CATCH.confi
Mar-29 13:34:29.045 [main] DEBUG nextflow.config.ConfigBuilder - Parsing config file: /somepath/CATCH.confi
Mar-29 13:34:29.114 [main] DEBUG nextflow.config.ConfigBuilder - Applying config profile: `standard`
Mar-29 13:34:30.341 [main] DEBUG nextflow.Session - Session uuid: 25da2eab-481e-42c1-babf-bfb3bf3dd89a
Mar-29 13:34:30.341 [main] DEBUG nextflow.Session - Run name: berserk_sax
Mar-29 13:34:30.342 [main] DEBUG nextflow.Session - Executor pool size: 2
Mar-29 13:34:30.370 [main] DEBUG nextflow.cli.CmdRun -
  Version: 19.04.0 build 5069
  Modified: 17-04-2019 06:25 UTC (08:25 CEST)
  System: Linux 3.10.0-1160.el7.x86_64
  Runtime: Groovy 2.5.6 on OpenJDK 64-Bit Server VM 1.8.0_262-b10
  Encoding: UTF-8 (UTF-8)
  Process: 36284@node120 [192.168.1.120]
  CPUs: 1 - Mem: 251.6 GB (82 GB) - Swap: 0 (0)
Mar-29 13:34:30.455 [main] DEBUG nextflow.Session - Work-dir: /somepath/work [gpfs]
Mar-29 13:34:30.456 [main] DEBUG nextflow.Session - Script base path does not exist or is not a directory: /somepath/bin
Mar-29 13:34:30.704 [main] DEBUG nextflow.Session - Session start invoked
Mar-29 13:34:30.708 [main] DEBUG nextflow.processor.TaskDispatcher - Dispatcher > start
Mar-29 13:34:30.722 [main] DEBUG nextflow.script.ScriptRunner - > Script parsing
Mar-29 13:34:31.534 [main] DEBUG nextflow.script.ScriptRunner - > Launching execution
Mar-29 13:34:31.596 [PathVisitor-1] DEBUG nextflow.file.PathVisitor - files for syntax: glob; folder: /somepath/; pattern: pattern*.bam; options: [:]
Mar-29 13:34:31.747 [PathVisitor-1] DEBUG nextflow.file.PathVisitor - files for syntax: glob; folder: /somepath/; pattern: pattern*.bam.pbi; options: [:]
Mar-29 13:34:32.015 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:34:32.016 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:34:32.044 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:34:32.046 [main] INFO  nextflow.executor.Executor - [warm up] executor > slurm
Mar-29 13:34:32.067 [main] DEBUG n.processor.TaskPollingMonitor - Creating task monitor for executor 'slurm' > capacity: 100; pollInterval: 5s; dumpInterval: 5m
Mar-29 13:34:32.070 [main] DEBUG nextflow.processor.TaskDispatcher - Starting monitor: TaskPollingMonitor
Mar-29 13:34:32.070 [main] DEBUG n.processor.TaskPollingMonitor - >>> barrier register (monitor: slurm)
Mar-29 13:34:32.085 [main] DEBUG nextflow.executor.Executor - Invoke register for executor: slurm
Mar-29 13:34:32.085 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:34:32.134 [main] DEBUG nextflow.Session - >>> barrier register (process: init)
Mar-29 13:34:32.136 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > init -- maxForks: 2
Mar-29 13:34:32.322 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:34:32.322 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:34:32.323 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:34:32.324 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.544 [main] DEBUG nextflow.Session - >>> barrier register (process: rename)
Mar-29 13:39:37.545 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > rename -- maxForks: 2
Mar-29 13:39:37.569 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.569 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.569 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.570 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.570 [main] DEBUG nextflow.Session - >>> barrier register (process: bam2fastaCorrection)
Mar-29 13:39:37.571 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > bam2fastaCorrection -- maxForks: 2
Mar-29 13:39:37.595 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.595 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.595 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.595 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.596 [main] DEBUG nextflow.Session - >>> barrier register (process: statsCorrection)
Mar-29 13:39:37.596 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > statsCorrection -- maxForks: 2
Mar-29 13:39:37.607 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.607 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.608 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.608 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.609 [main] DEBUG nextflow.Session - >>> barrier register (process: demultiplexage)
Mar-29 13:39:37.609 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > demultiplexage -- maxForks: 2
Mar-29 13:39:37.895 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.895 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.895 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.895 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.896 [main] DEBUG nextflow.Session - >>> barrier register (process: nomenclature)
Mar-29 13:39:37.896 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > nomenclature -- maxForks: 2
Mar-29 13:39:37.904 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.912 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.913 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.913 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.913 [main] DEBUG nextflow.Session - >>> barrier register (process: bam2fastaDemultiplexage)
Mar-29 13:39:37.913 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > bam2fastaDemultiplexage -- maxForks: 2
Mar-29 13:39:37.942 [main] DEBUG nextflow.processor.ProcessFactory - << taskConfig executor: slurm
Mar-29 13:39:37.942 [main] DEBUG nextflow.processor.ProcessFactory - >> processorType: 'slurm'
Mar-29 13:39:37.943 [main] DEBUG nextflow.executor.Executor - Initializing executor: slurm
Mar-29 13:39:37.943 [main] DEBUG n.executor.AbstractGridExecutor - Creating executor 'slurm' > queue-stat-interval: 1m
Mar-29 13:39:37.943 [main] DEBUG nextflow.Session - >>> barrier register (process: statsDemultiplexage)
Mar-29 13:39:37.943 [main] DEBUG nextflow.processor.TaskProcessor - Creating operator > statsDemultiplexage -- maxForks: 2
Mar-29 13:39:37.953 [main] DEBUG nextflow.script.ScriptRunner - > Await termination
Mar-29 13:39:37.953 [main] DEBUG nextflow.Session - Session await
Mar-29 13:39:37.998 [Task submitter] DEBUG nextflow.executor.GridTaskHandler - [SLURM] submitted process init > jobId: 33152594; workDir: /somepath/work/23/c994dde9ed99a00f4e190239f07998
Mar-29 13:39:38.002 [Task submitter] INFO  nextflow.Session - [23/c994dd] Submitted process > init
Mar-29 13:39:42.347 [Task monitor] DEBUG n.processor.TaskPollingMonitor - Task completed > TaskHandler[jobId: 33152594; id: 1; name: init; status: COMPLETED; exit: 0; error: -; workDir: /somepath/work/23/c994dde9ed99a00f4e190239f07998 started: 1648553982321; exited: 2022-03-29T11:39:40.340241Z; ]
Mar-29 13:39:42.364 [Actor Thread 3] DEBUG nextflow.Session - <<< barrier arrive (process: rename)
Mar-29 13:39:42.391 [Actor Thread 3] DEBUG nextflow.Session - <<< barrier arrive (process: bam2fastaCorrection)
Mar-29 13:39:42.392 [Actor Thread 3] DEBUG nextflow.Session - <<< barrier arrive (process: statsCorrection)
Mar-29 13:39:42.393 [Actor Thread 3] DEBUG nextflow.Session - <<< barrier arrive (process: init)
Mar-29 13:39:42.400 [Actor Thread 8] DEBUG nextflow.Session - <<< barrier arrive (process: demultiplexage)
Mar-29 13:39:42.421 [Actor Thread 8] DEBUG nextflow.Session - <<< barrier arrive (process: bam2fastaDemultiplexage)
Mar-29 13:39:42.424 [Actor Thread 9] DEBUG nextflow.Session - <<< barrier arrive (process: nomenclature)
Mar-29 13:39:42.456 [Actor Thread 10] DEBUG nextflow.Session - <<< barrier arrive (process: statsDemultiplexage)
Mar-29 13:39:42.456 [main] DEBUG nextflow.Session - Session await > all process finished
Mar-29 13:39:47.290 [Task monitor] DEBUG n.processor.TaskPollingMonitor - <<< barrier arrives (monitor: slurm)
Mar-29 13:39:47.292 [main] DEBUG nextflow.Session - Session await > all barriers passed
Mar-29 13:39:47.300 [main] DEBUG nextflow.trace.StatsObserver - Workflow completed > WorkflowStats[succeedCount=1; failedCount=0; ignoredCount=0; cachedCount=0; succeedDuration=19ms; failedDuration=0ms; cachedDuration=0ms]
Mar-29 13:39:47.544 [main] DEBUG nextflow.CacheDB - Closing CacheDB done
Mar-29 13:39:47.569 [main] DEBUG nextflow.script.ScriptRunner - > Execution complete -- Goodbye
N E X T F L O W  ~  version 19.04.0
Launching `CATCH.nf` [mighty_joliot] - revision: 79006232db
debut Script
Donnees = /work/project/gaia/Samplix_test*.bam
executor >  slurm (1)
[23/c994dd] process > init [100%] 1 of 1 ✔
Completed at: 29-Mar-2022 13:39:47
Duration    : 11.4s
CPU hours   : (a few seconds)
Succeeded   : 1

all this while when it runs normally (the exact same way to run etc.) there are many process.

What could have gone wrong ? what could be the reason for nextflow not to chain with next process ?

Nicolas

Upvotes: 2

Views: 801

Answers (2)

Steve
Steve

Reputation: 54502

Process execution stops if one or more input channels are empty. It's likely your input channel declared here is empty:

bampbi_ch = bamtuple_ch.join(pbituple_ch).combine(init_ch)

This could be because one or more component channels (i.e. bamtuple_ch, pbituple_ch, init_ch) are empty. However, the usual culprit in this case is that the join operation hasn't succeeded in the way it was intended.

The join operator creates a channel that joins together the items emitted by two channels for which exists a matching key. The key is defined, by default, as the first element in each item emitted.

The default behavior (which can be changed with the remainder parameter) is for incomplete tuples to be discarded. Check that bamtuple_ch and pbituple_ch produce tuples that can be joined using a shared/common key as the first element.

Upvotes: 2

ul.Duc
ul.Duc

Reputation: 41

Ok so the join was indeed the problem. but not on the side of init, more on the pbituple that was empty because the files were not found because of a / missing. sorry about that and TIL about the logic of Nextflow !

Upvotes: 1

Related Questions