Matthew Kozubov
Matthew Kozubov

Reputation: 99

Is there a way to access/modify the contents of a Nextflow channel?

I have a situation where my workflow outputs a main directory, which I emit from a process using DSL2. I feed this output to a python script, which can easily loop over the sub-directories and their respective files, pulling out information and compiling it into a .tsv

Two important pieces of information the python script is getting, is the name of the subdirectory and which file is actually important within the subdirectory.

I would like to take my process output ("root dir") + subdirectory (from file) + important filename (from file) and make it into a new generator path to feed to another process.

Am I just using a bad method? Is there a better way to access a generator? In the documentation I saw subscribe, but I haven't had luck using this functionality. Thank you in advance.

Example .tsv file (column 1 and 3 are what I want to append to generator)

GCF_000005845.2 Escherichia coli str. K-12 substr. MG1655, complete genome      GCF_000005845.2_ASM584v2_genomic.fna
GCF_000008865.2 Escherichia coli O157:H7 str. Sakai DNA, complete genome        GCF_000008865.2_ASM886v2_genomic.fna

Work directory structure

├── c6
│   └── 6598d4838f61d0421f03216990465c
│       ├── ecoli
│       │   ├── README.md
│       │   └── ncbi_dataset
│       │       ├── data
│       │       │   ├── GCF_000005845.2
│       │       │   │   ├── GCF_000005845.2_ASM584v2_genomic.fna
│       │       │   │   ├── genomic.gff
│       │       │   │   ├── protein.faa
│       │       │   │   └── sequence_report.jsonl
│       │       │   ├── GCF_000008865.2
│       │       │   │   ├── GCF_000008865.2_ASM886v2_genomic.fna
│       │       │   │   ├── genomic.gff
│       │       │   │   ├── protein.faa
│       │       │   │   └── sequence_report.jsonl
│       │       │   ├── assembly_data_report.jsonl
│       │       │   └── dataset_catalog.json
│       │       └── fetch.txt

Here is my nextflow script (constructive criticism very welcome):

#!/usr/bin/env Nextflow

nextflow.enable.dsl=2

workflow {

  //ref_genome_ch = Channel.fromPath("$params.ref_genome")
  println([params.taxon, params.zipName, params.unzippedDir])
  DOWNLOAD_ZIP(params.taxon, params.zipName)
  UNZIP(DOWNLOAD_ZIP.out.zipFile)
  REHYDRATE(UNZIP.out.unzippedDir)
  COLLECT_NAMES(REHYDRATE.out.dataDir)


  // I want to get the dir name and file name out of
  // relations.txt
  //thing = Channel.from(  )
  //thing.view()
  //organism_genomes = REHYDRATE.out.dataDir.subscribe { println("$it/")}

}

process DOWNLOAD_ZIP {
  errorStrategy 'ignore'

  input:
  val taxonName
  val zipName

  output:
  path "${zipName}" , emit: zipFile

  script:
  def reference = params.reference
  """
  datasets download genome \\
     taxon '${taxonName}' \\
     --dehydrated \\
     --filename ${zipName} \\
     ${reference} \\
     --exclude-genomic-cds
  """

}


process UNZIP {
  input:
  path zipFile

  output:
  path "${zipFile.baseName}" , emit: unzippedDir

  script:
  """
  unzip $zipFile -d ${zipFile.baseName}
  """

}


process REHYDRATE {
  input:
  path unzippedDir

  output:
  path "$unzippedDir/ncbi_dataset/data" , emit: dataDir

  script:
  """
  datasets rehydrate \\
     --directory $unzippedDir
  """
}



process COLLECT_NAMES {
  publishDir params.results

  input:
  path dataDir

  output:
  path "relations.txt" , emit: org_names

  script:
  """
  python "$baseDir/bin/collect_org_names.py" $dataDir
  """

}



Edit: User @Steve recommended channel operators. I don't fully understand the groovy {thing -> stuff} syntax yet, but I tried to do this:

thing = REHYDRATE.out.dataDir.map{"$it/*"}
thing.view()

and I get

/mnt/c/Users/mkozubov/Desktop/nextflow_tutorial/tRNA_stuff/work/d0/long_hash/ecoli/ncbi_dataset/data/*

printed... But when I feed this into a process that just has a script: println(input) I get an error saying that the command executed is null, command ouput is (empty) and that target '*' is not a directory.

My question is why didn't the .map operator expand the * as entering "PATH/*" into a channel would've?




Edit2: I feel like I almost had something. I changed the output of the COLLECT_NAMES script to contain the path to the files. I now want to parse this file and read the contents into a channel. For that I did

organism_genome_files = Channel.from()
  COLLECT_NAMES.out.org_names.map {
    new File(it.toString()).eachLine { line ->
      organism_genome_files << line.split('\t')[3] }
  }

If I replace the organism_genome_files << line.split('\t')[3] with println line.split('\t')[3] I can see the content I want, but I can't seem to find a way of pulling this info out.

I also tried it with organism_genome_files as a list, but nothing seems to be working, I just can't seem to pull info from channels and effectively mutate it.

The .splitCSV() method seems like it could be useful, but I still don't understand how to get a channel to work as an input to another channel :(

Upvotes: 1

Views: 2108

Answers (1)

Steve
Steve

Reputation: 54502

Is there a way to access/modify the contents of a Nextflow channel?

You can use one or more transforming operators for this. For example, to get the directory name and filename of 'relations.txt', you could use:

COLLECT_NAMES.out.org_names.map { tuple( it.parent, it.name ) }.view()

See also: Check file attributes



My question is why didn't the .map operator expand the * as entering "PATH/*" into a channel would've?

It's only been told to return a String (actually a GString). Groovy won't automatically expand this in the same way your shell would. I think what you want is some way to list the contents of that directory. For this you can use the listFiles() method:

REHYDRATE.out.dataDir.map { tuple( it.listFiles() ) }.view()

See also: List directory content



I changed the output of the COLLECT_NAMES script to contain the path to the files. I now want to parse this file and read the contents into a channel.

Without more details about what these files are, how big they are, how they are going to be used, and what the return type needs to be, I'm really only guessing here. So I've put together some potential solutions that might help get you started:

  1. This is implemented as a closure and returns a channel of lists:
def getOrganismGenomeFiles = { reader ->
    def values = []
    reader.splitEachLine('\t') { fields ->
        values.add( fields[3] )
    }
    return values
}

ch.map( getOrganismGenomeFiles ).view()
  1. This slurps the lines but also returns a channel of lists:
ch.map { it.readLines().collect { it.split('\t')[3] } }.view()
  1. This slurps the file contents, split them into records using the splitCsv operator and returns a channel of values:
ch.map { it.text }.splitCsv(sep: '\t').map { it[3] }.view()
  • Note: I shortened the input channel name for readability. Please replace ch with COLLECT_NAMES.out.org_names in the above examples.


My (maybe not so constructive) criticism actually regards the workflow design not so much the style, layout etc. My preference is and will always be to avoid using some web get command like curl, wget, or in this case NCBI Datasets, inside of a Nextflow process. Sure, you can make things work this way, but you'll ultimately run into problems when you later decide to share your workflow with others. Even if everyone agrees that wasting additional resources on downloading files is fine (which they won't but maybe these costs are negligible in the scheme of things...) you can't necessarily guarantee that the machine or node your process lands on will even be able to resolve the specified URL(s). There's ways to work around these issues, but my advice is to just let Nextflow localize the required files. The problem is how. And this of course depends on what you're actually trying to do...

These files are available from the NCBI FTP Site and their URLs could be added to your configuration, perhaps something like:

params {
  genomes {
    'GCF_000005845.2_ASM584v2' {
      genomic_fna = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_genomic.fna.gz'
      genomic_gff = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_genomic.gff.gz'
      protein_faa = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000005845.2_ASM584v2/GCF_000005845.2_ASM584v2_protein.faa.gz'
    }
    'GCF_000008865.2_ASM886v2' {
      genomic_fna = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_genomic.fna.gz'
      genomic_gff = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_genomic.gff.gz'
      protein_faa = 'ftp://ftp.ncbi.nlm.nih.gov/genomes/refseq/bacteria/Escherichia_coli/reference/GCF_000008865.2_ASM886v2/GCF_000008865.2_ASM886v2_protein.faa.gz'
    }
  }
}

Then to access the files for a given genome, use something like:

genome = 'GCF_000005845.2_ASM584v2'

genomic_fna = genomes[genome].genomic_fna
genomic_gff = genomes[genome].genomic_gff
protein_faa = genomes[genome].protein_faa

Upvotes: 2

Related Questions