Daffy
Daffy

Reputation: 51

How to process multiple samples as input in Nextflow?

I'm trying to learn nextflow but it's not going very well. I used NGS-based double-end sequencing data to build an analysis flow from fastq files to vcf files using Nextflow. However I got stuck right at the beginning, as shown in the code. The first process and the second porcess sworks fine, but when passing the files to the third process there is an ERROR and I can't execute the whole process anymore. What should I do? Thanks for a help.

Following is my code:

#! /usr/bin/env nextflow

params.fq1 = "/home/duxu/project/data/*1.fq.gz"
params.fq2 = "/home/duxu/project/data/*2.fq.gz"
params.index = "/home/duxu/project/result/index.list"
params.primer1 = "/home/duxu/project/data/primer_1.fasta"
params.primer2 = "/home/duxu/project/data/primer_2.fasta"
params.ref = "/home/duxu/project/data/hg19.p13.plusMT.no_alt_analysis_set/"
params.output='results'
params.refhg19 = "/home/duxu/project/data/hg19.p13.plusMT.no_alt_analysis_set/hg19.p13.plusMT.no_alt_analysis_set.fa"
params.Mills = "/home/duxu/project/data/Mills_and_1000G_gold_standard.indels.hg19.sites.vcf"
params.1000G = "/home/duxu/project/data/1000G_phase1.indels.hg19.sites.vcf"
params.dbsnp = "/home/duxu/project/data/dbsnp_138.hg19.vcf

fq2 = Channel.fromPath(params.fq2)
fq2 = Channel.fromPath(params.fq2)
index = Channel.fromPath(params.index)
index.into { index_1; index_2; index_3 }
primer1 = Channel.fromPath(params.primer1)
primer2 = Channel.fromPath(params.primer2)
ref = Channel.fromPath(params.ref)
refhg19 = Channel.fromPath(params.refhg19)
refhg19.into { refhg19_1; refhg19_2 ; refhg19_3; refhg19_4; refhg19_5}

Mills = Channel.fromPath(params.Mills)
1000G = Channel.fromPath(params.1000G)
dbsnp = Channel.fromPath(params.dbsnp)

This is first process:

process soapnuke{
    conda'soapnuke'
    tag{"soapnuk ${fq1} ${fq2}"}
    publishDir "${params.outdir}/SOAPnuke", mode: 'copy'
    input:
        file rawfq1 from fq1
        file rawfq2 from fq2

    output:
        file 'clean1.fastq.gz' into clean_fq1
        file 'clean2.fastq.gz' into clean_fq2

script:
    """
    SOAPnuke filter -1 $rawfq1 -2 $rawfq2 -l 12 -q 0.5 -Q 2 -o . \
        -C clean1.fastq.gz -D clean2.fastq.gz --trim 8,0,8,0
    """
}

The second process

process barcode_splitter{

    tag{"barcode_splitter"}
    publishDir "${params.output}/barcode_splitter", mode: 'copy'
    input:
        file split1 from clean_fq1
        file split2 from clean_fq2
        file index from index_1

    output:
       file '*-read-1.fastq.gz' into trimmed_index1
       file '*-read-2.fastq.gz' into trimmed_index2

    script:
    """
    barcode_splitter --bcfile $index $split1 $split2  --idxread 1 2 --mismatches 1 --suffix .fastq --gzipout

    mv multimatched-read-1.fastq.gz multicatched.1.fastq.gz
    mv multimatched-read-2.fastq.gz multicatched.2.fastq.gz
    mv untimatched-read-1.fastq.gz multicatched.1.fastq.gz
    mv untimatched-read-2.fastq.gz multicatched.2.fastq.gz
    """
}

The third ,and I got an error from this step. In fact, this process has multiple samples, since the previous process baicode_splitter output multiple files. This process cutadapt is designed to excise the first few bases of multiple samples.

process cutadapt{

    tag{"cutadapt"}
    publishDir "${params.output}/cut_primer", mode: 'copy'
      input:
         val sample from sample
         file primer_1 from primer1
         file primer_2 from primer2
         file ${sample}-read-1.fastq.gz from trimmed_index1.collect()
         file ${sample}-read-2.fastq.gz from trimmed_index2.collect()

      output:
         file '*.trim.1.fastq.gz' into trimmed_primer1
         file '*.trim.2.fastq.gz' into trimmed_primer2

   script:
    """
   cutadapt -g file:$primer_1 -G file:$primer_2 -j 64  --discard-untrimmed -o \${sample}.trim.1.fastq.gz  -p \$(sample}.trim.2.fastq.gz ${sample}-read-1.fastq.gz ${sample}-read-2.fastq.gz
    """
}

The fourth process is designed to match multiple samples to a reference genome

process bwa_mapping{
     tag{bwa_maping}
     publishDir "${params.output}/barcode_splitter", mode: 'copy'
       input:
       file pair1 from trimmed_primer1
       file pair2 from trimmed_primer2
       file refhg19 from refhg19_1

       output:
       file '*_R1_R2.bam' into addheader

   script:
    """
    bwa mem -t 64 $refhg19 trimmed_primer_*-read-1.fastq trimmed_primer_*-read-2.fastq | samtools view -@8 -b | samtools sort -m 2G -@64 > * _R1_R2.bam
    """
}

The next remaining processes are all multisample-based operations

process {BaseRecalibrator
    tag{"BaseRecalibrator"}
    publishDir "${params.output}/BQSR/BaseRecalibratoraddheader", mode: 'copy'
      input:
         file BaseRecalibrator from BaseRecalibrator_1
         file refhg19 from refhg19_2
         file Mills from Mills
         file 1000G from 1000G
         file dbsnp from dbsnp

      output:
         file '*.recal.table' into ApplyBQSR

 script:
    """
   gatk --java-options "-XX:ParallelGCThreads=2" BaseRecalibrator -I BaseRecalibrator -R $refhg19_2  --known-sites  $Mills --known-sites $1000G --known-sites $dbsnp -O *.recal.table

    """
}

process {ApplyBQSR
    tag{"ApplyRecalibrator"}
    publishDir "${params.output}/BQSR/ApplyRecalibrator", mode: 'copy'
      input:
         file ApplyBQSR from ApplyBQSR
         file refhg19 from refhg19_3
         file BaseRecalibrator from BaseRecalibrator_2

      output:
         file '*.bam' into HaplotypeCaller

 script:
    """
   gatk --java-options "-XX:ParallelGCThreads=2" ApplyBQSR -I BaseRecalibrator -R $refhg19_3  --bqsr-recal-file $AppleBQSR -O *.bam
    """
}


process {HaplotypeCaller
    tag{"HaplotypeCallerr"}
    publishDir "${params.output}/GATK/HaplotypeCaller", mode: 'copy'
      input:
         file HaplotypeCaller from  HaplotypeCaller
         file refhg19 from refhg19_4
         file BaseRecalibrator from BaseRecalibrator_3

      output:
         file '*.g.vcf.gz' into GenotypeGVCFs

 script:
    """
   gatk --java-options "-XX:ParallelGCThreads=2" HaplotypeCaller  -R $refhg19_4 -I BaseRecalibrator -O *.g.vcf.gz -ERC GVCF
    """
}


process {GenotypeGVCFs
    tag{"GenotypeGVCFs"}
    publishDir "${params.output}/GATK/GenotypeGVCFs", mode: 'copy'
      input:
         file GenotypeGVCFs from GenotypeGVCFs
         file refhg19 from refhg19_5

      output:
         file '*.vcf.gz' into end

 script:
    """
   gatk --java-options "-XX:ParallelGCThreads=2" GenotypeGVCFs  -R $refhg19_5 -V $GenotypeGVCFs -O *.vcf.gz

    """

Upvotes: 0

Views: 1134

Answers (1)

Steve
Steve

Reputation: 54502

Not sure what the error is exactly, but maybe it doesn't matter. It looks like you've declared more than one queue channels in your 'cutadapt' input declaration. Usually you don't want to do this. Please see: understand how multiple input channels work.

Note that the Channel.fromPath factory method creates a queue channel. Here, 'primer1' and 'primer2' are both queue channels that each provide only a single value:

params.primer1 = "/home/duxu/project/data/primer_1.fasta"
params.primer2 = "/home/duxu/project/data/primer_2.fasta"

primer1 = Channel.fromPath(params.primer1)
primer2 = Channel.fromPath(params.primer2)

What you want instead are value channels which can be read an infinite number of times without consuming their content:

params.primer1 = "/home/duxu/project/data/primer_1.fasta"
params.primer2 = "/home/duxu/project/data/primer_2.fasta"

primer1 = file(params.primer1)
primer2 = file(params.primer2)

A typically process, which involves executing a process for each of n samples, will involve a single queue channel and zero or more value channels. For example:

process cutadapt{

    tag { sample }

    publishDir "${params.output}/cut_primer", mode: 'copy'
    cpus 64

    input:
    tuple val(sample), path(fq1), path(fq2) from trimmed_index
    path primer1
    path primer2

    output:
    tuple val(sample), path("${sample}.trim.1.fq.gz"), path("${sample}.trim.2.fq.gz")

    script:
    """
    cutadapt \\
        -g "file:${primer1}" \\
        -G "file:${primer2} \\
        -j ${task.cpus} \\
        --discard-untrimmed \\
        -o "${sample}.trim.1.fq.gz" \\
        -p "${sample}.trim.2.fq.gz" \\
        "${fq1}" \\
        "${fq2}"
    """
}

Note that when the file input name is the same as the channel name, the from channel declaration can be omitted. I also used the path qualifier above, as it should be preferred over the file qualifier when using Nextflow 19.10.0 or later.

Upvotes: 1

Related Questions