Vickie
Vickie

Reputation: 43

Nextflow DSL2: how to combine outputs (channels) from multiple processes into input of another process by part of filename?

I'm trying to combine outputs from two separate processes A and B, where each of them outputs multiple files, into input of process C. All file names have in common a chromosome number(for example "chr1"). The process A outputs files: /path/chr1_qc.vcf.gz, /path/chr2_qc.vcf.gz and etc (genotype files).

Process B outputs files: /path/chr1.a.bcf, /path/chr1.b.bcf, /path/chr1.c.bcf.../path/chr2.a.bcf, /path/chr2.b.bcf and etc (region files). And the number of both file-sets could vary each time.

Part of the code:

process A {
  module "bcftools/1.16"
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  path vcf
  path tbi

  output:
  path ("${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz")
 
  script:
  """
  bcftools view -R ${params.sites_list} -Oz -o ${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz ${vcf} //generates QC-ed genome files
  tabix -f ${(vcf =~ /chr\d{1,2}/)[0]}_qc.vcf.gz //indexing QC-ed genomes
  """
}

process B {
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  path(vcf)

  output:
  tuple path("${(vcf =~ /chr\d{1,2}/)[0]}.*.bed")

  script:
  """
  python split_chr.py ${params.chr_lims} ${vcf} //generates region files
  """
}


process C {
  publishDir "${params.out_dir}", mode: 'copy', overwrite: true
  input:
  tuple path(vcf), path(bed)
  
  output:
  path "${bed.SimpleName}.vcf.gz"

  script:
  """
  bcftools view -R ${bed} -Oz -o ${bed.SimpleName}.vcf.gz ${vcf}
  """
}

workflow {
   A(someprocess.out)
   B(A.out)
   
   C(combined_AB_files)
}

Process B output.view() output:

[/path/chr1.a.bed, /path/chr1.b.bed]
[/path/chr2.a.bed, /path/chr2.b.bed]

How can I get the process C to receive an input as a channel of tuples (A and B outputs combined by chromosome name) like this:

[ /path/chr1_qc.vcf.gz, /path/chr1.a.bcf ]
[ /path/chr1_qc.vcf.gz, /path/chr1.b.bcf ]
...
[ /path/chr2_qc.vcf.gz, /path/chr2.a.bcf ]
...    

Upvotes: 3

Views: 2907

Answers (2)

Steve
Steve

Reputation: 54502

I think what you want is the second form of the combine operator, which allows you to combine items that share a matching key using the by parameter. If one or more of your channels are missing a shared key in the first element, you can just use the map operator to produce such a key. To get the desired output, use the transpose operator and specify the index (zero based) of the element to be transposed, again using the by parameter. For example:

workflow {

    Channel
        .fromPath( './data/*.bed' )
        .map { tuple( it.simpleName, it ) }
        .groupTuple()
        .set { bed_files }

    Channel
        .fromPath( './data/*_qc.vcf.gz' )
        .map { tuple( it.simpleName - ~/_qc$/, it ) }
        .combine( bed_files, by: 0 )
        .transpose( by: 2 )
        .map { chrom, vcf, bed -> tuple( vcf, bed ) }
        .view()
}

Results:

$ touch ./data/chr{1..3}.{a..c}.bed
$ touch ./data/chr{1..3}_qc.vcf.gz
$ nextflow run main.nf
N E X T F L O W  ~  version 22.10.0
Launching `main.nf` [pedantic_woese] DSL2 - revision: 9c5abfca90
[/data/chr1_qc.vcf.gz, /data/chr1.c.bed]
[/data/chr1_qc.vcf.gz, /data/chr1.a.bed]
[/data/chr1_qc.vcf.gz, /data/chr1.b.bed]
[/data/chr2_qc.vcf.gz, /data/chr2.b.bed]
[/data/chr2_qc.vcf.gz, /data/chr2.a.bed]
[/data/chr2_qc.vcf.gz, /data/chr2.c.bed]
[/data/chr3_qc.vcf.gz, /data/chr3.c.bed]
[/data/chr3_qc.vcf.gz, /data/chr3.b.bed]
[/data/chr3_qc.vcf.gz, /data/chr3.a.bed]

Note that when two or more queue channels are declared as process inputs (like in your process A), the process will block until it receives a value from each input channel. As these are run in parallel and asynchronously, there's no guarantee that items will be emitted in the order that they were received. This can result in mix-ups, where for example, you unexpectedly end up with an index file that belongs to another VCF. Most of the time, what you want is one queue channel and one or more value channels. The section in the docs on multiple input channels explains this quite well in my opinion, and is well worth the time reading if you haven't already. Also, joining and combining channels becomes a lot easier when your processes define tuples in their input and output declarations, where the first element is a key, like a sample name/id. I think you want something something like the following:

params.vcf_files = './data/*.vcf.gz{,.tbi}'
params.sites_list = './data/sites.tsv'
params.chr_lims = './data/file.txt'

params.outdir = './results'
process proc_A {

    tag "${sample}: ${indexed_vcf.first()}"

    publishDir "${params.outdir}/proc_A", mode: 'copy', overwrite: true

    module "bcftools/1.16"

    input:
    tuple val(sample), path(indexed_vcf)
    path sites_list

    output:
    tuple val(sample), path("${sample}_qc.vcf.gz{,.tbi}")

    script:
    def vcf = indexed_vcf.first()

    """
    bcftools view \\
        -R "${sites_list}" \\
        -Oz \\
        -o "${sample}_qc.vcf.gz" \\
        "${vcf}"
    bcftools index \\
        -t \\
        "${sample}_qc.vcf.gz"
    """
}
process proc_B {

    tag "${sample}: ${indexed_vcf.first()}"

    publishDir "${params.outdir}/proc_B", mode: 'copy', overwrite: true

    input:
    tuple val(sample), path(indexed_vcf)
    path chr_lims

    output:
    tuple val(sample), path("*.bed")

    script:
    def vcf = indexed_vcf.first()

    """
    split_chr.py "${chr_lims}" "${vcf}"
    """
}
process proc_C {

    tag "${sample}: ${indexed_vcf.first()}: ${bed.name}"

    publishDir "${params.outdir}/proc_C", mode: 'copy', overwrite: true

    input:
    tuple val(sample), path(indexed_vcf), path(bed)

    output:
    tuple val(sample), path("${bed.simpleName}.vcf.gz")

    script:
    def vcf = indexed_vcf.first()

    """
    bcftools view \\
        -R "${bed}" \\
        -Oz \\
        -o "${bed.simpleName}.vcf.gz" \\
        "${vcf}"
    """
}
workflow {

    vcf_files = Channel.fromFilePairs( params.vcf_files )

    sites_list = file( params.sites_list )
    chr_lims = file( params.chr_lims )

    proc_A( vcf_files, sites_list )
    proc_B( proc_A.out, chr_lims )

    proc_A.out \
        | combine( proc_B.out, by: 0 ) \
        | map { sample, indexed_vcf, bed_files ->
            bed_list = bed_files instanceof Path ? [bed_files] : bed_files

            tuple( sample, indexed_vcf, bed_list )
        } \
        | transpose( by: 2 ) \
        | proc_C \
        | view()
}

The above should produce results like:

$ nextflow run main.nf
N E X T F L O W  ~  version 22.10.0
Launching `main.nf` [mighty_elion] DSL2 - revision: 5ea25ae72c
executor >  local (15)
[b4/08df9d] process > proc_A (foo: foo.vcf.gz)           [100%] 3 of 3 ✔
[93/55e467] process > proc_B (foo: foo_qc.vcf.gz)        [100%] 3 of 3 ✔
[8b/cd7193] process > proc_C (foo: foo_qc.vcf.gz: b.bed) [100%] 9 of 9 ✔
[bar, ./work/90/53b9c6468ca54bb0f4eeb99ca82eda/a.vcf.gz]
[bar, ./work/24/cca839d5f63ee6988ead96dc9fbe1d/b.vcf.gz]
[bar, ./work/6f/61e1587134e68d2e358998f61f6459/c.vcf.gz]
[baz, ./work/f8/1484e94b9187ba6aae81d68f0a18cf/b.vcf.gz]
[baz, ./work/9c/20578262f5a2c13c6c3b566dc7b7d8/c.vcf.gz]
[baz, ./work/f5/3405b54f81f6f500a3ee4a78f5e6df/a.vcf.gz]
[foo, ./work/39/945fb0d3f375260e75afbc9caebc5d/a.vcf.gz]
[foo, ./work/de/cecd94ff39f204e799cb8e4c4ad46f/c.vcf.gz]
[foo, ./work/8b/cd7193107f6be5472d2e29982e3319/b.vcf.gz]

Also note that third party scripts, like your Python script, can be moved to a folder called bin in the root of your project repository (i.e. the same directory as your main.nf). And if you make your script executable, you will be able to invoke "as-is", i.e. without the need for an absolute path to it.

Upvotes: 4

mribeirodantas
mribeirodantas

Reputation: 489

This can be done with channel operators. Check the code below, with some comments:

workflow {

  // Let's start by building channels similar to the ones you described
  Channel
    .of(file('/path/chr1_qc.vcf.gz'), file('/path/chr2_qc.vcf.gz'))
    .set { pAoutput}
  Channel
    .of(file('/path/chr1.a.bcf'), file('/path/chr1.b.bcf'), file('/path/chr1.c.bcf'),
        file('/path/chr2.a.bcf'), file('/path/chr2.b.bcf'), file('/path/chr2.c.bcf'))
    .set { pBoutput }

  // Now, let's create keys to relate the elements in the two channels
  pAoutput
    .map { filepath -> [filepath.name.tokenize('_')[0], filepath ] }
    .set { pAoutput_tuple }
  // The channel now looks like this:
  // [chr1, /path/chr1_qc.vcf.gz]
  // [chr2, /path/chr2_qc.vcf.gz]
  pBoutput
    .map { filepath -> [filepath.name.tokenize('.')[0], filepath ] }
    .set { pBoutput_tuple }
  // And:
  // [chr1, /path/chr1.a.bcf]
  // [chr1, /path/chr1.b.bcf]
  // [chr1, /path/chr1.c.bcf]
  // [chr2, /path/chr2.a.bcf]
  // [chr2, /path/chr2.b.bcf]
  // [chr2, /path/chr2.c.bcf]

  // Combine the two channels and group by key
  pAoutput_tuple
    .mix(pBoutput_tuple)
    .groupTuple()
    .flatMap { chrom, path_list ->
      path_list.split {
        it.name.endsWith('.vcf.gz')
      }.combinations()
    }
    .view()
}

You can check the output below:

N E X T F L O W  ~  version 22.10.4
Launching `ex.nf` [maniac_pike] DSL2 - revision: f87873ef13
[/path/chr1_qc.vcf.gz, /path/chr1.a.bcf]
[/path/chr1_qc.vcf.gz, /path/chr1.b.bcf]
[/path/chr1_qc.vcf.gz, /path/chr1.c.bcf]
[/path/chr2_qc.vcf.gz, /path/chr2.a.bcf]
[/path/chr2_qc.vcf.gz, /path/chr2.b.bcf]
[/path/chr2_qc.vcf.gz, /path/chr2.c.bcf]

Upvotes: 1

Related Questions