Reputation: 115
I am performing a scatter-gather operation on NextFlow.
It looks like the following:
reads = PATH+"test_1.fq"
outdir = "results"
split_read_ch = channel.fromFilePairs(reads, checkIfExists: true, flat:true ).splitFastq( by: 10, file:"test_split" )
process Scatter_fastP {
tag 'Scatter_fastP'
publishDir outdir
input:
tuple val(name), path(reads) from split_read_ch
output:
file "${reads}.trimmed.fastq" into gather_fatsp_ch
script:
"""
fastp -i ${reads} -o ${reads}.trimmed.fastq
"""
}
gather_fatsp_ch.collectFile().view().println{ it.text }
I run this code with all the benchmarks options proposed by Nextflow (https://www.nextflow.io/docs/latest/tracing.html):
nextflow run main.nf -with-report nextflow_report -with-trace nextflow_trace -with-timeline nextflow_timeline -with-dag nextflow_dag.html
In these tracing files, I can find the resources and speed of the 10 Scatter_fastP processes.
But I would like to also measure the resources and speed of the creation of the split_read_ch
and the gather_fastp_ch
channels.
I have tried to include the channels' creation in processes but I cannot find a solution to make it work. Is there a way to include the channel creation into the tracing files? Or is there a way I have not found to create these channels into processes?
Thank you in advance for your help.
Upvotes: 2
Views: 380
Reputation: 54502
Although Nextflow can parse FASTQ files and split them into smaller files etc, generally it's better to pass off these operations to another process or set of processes, especially if your input FASTQ files are large. This is beneficial in two ways: (1) your main nextflow process doesn't need to work as hard, and (2) you get granular task process stats in your nextflow reports.
The following example uses GNU split to split the input FASTQ files, and gathers the outputs using the groupTuple() operator and the groupKey()
built-in to stream the collected values as soon as possible. You'll need to adapt for your non-gzipped inputs:
nextflow.enable.dsl=2
params.num_lines = 40000
params.suffix_length = 5
process split_fastq {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${name}-${/[0-9]/*params.suffix_length}.fastq.gz")
shell:
'''
zcat "!{fastq}" | split \\
-a "!{params.suffix_length}" \\
-d \\
-l "!{params.num_lines}" \\
--filter='gzip > ${FILE}.fastq.gz' \\
- \\
"!{name}-"
'''
}
process fastp {
input:
tuple val(name), path(fastq)
output:
tuple val(name), path("${fastq.getBaseName(2)}.trimmed.fastq.gz")
"""
fastp -i "${fastq}" -o "${fastq.getBaseName(2)}.trimmed.fastq.gz"
"""
}
workflow {
Channel.fromFilePairs( './data/*.fastq.gz', size: 1 ) \
| split_fastq \
| map { name, fastq -> tuple( groupKey(name, fastq.size()), fastq ) } \
| transpose() \
| fastp \
| groupTuple() \
| map { key, fastqs -> tuple( key.toString(), fastqs ) } \
| view()
}
Upvotes: 2