Cris Tuñí
Cris Tuñí

Reputation: 107

Nextflow: how should I truncate a fastq file at line X? Process fails with error 141

I'm trying to create a process where a gzipped fastq file is truncated at 900k lines. If it has less than 900k lines, the code executes but does not modify the file. But if the file has more than 900k lines, the new file should only have 900k lines. I've tried it with sed and head, piping the zcat command of the file to one of those, but the process fails with error code: 141, which for what I've found, has to do with pipes.

My code, using head and paired end reads is as follows:

process TRUNCATE_FASTQ {

input:
set val(sample), val(single_end), path(reads) from ch_reads

output:
path foo into ch_output

script:
zcat ${reads[0]} | head -900000 > truncated_file_1
gzip truncated_file_1
mv truncated_file_1.gz truncated_${reads[0]}
zcat ${reads[1]} | head -900000 > truncated_file_2
gzip truncated_file_2
mv truncated_file_2.gz truncated_${reads[1]}

}

My code using sed is: (inputs and outputs are the same, only the script changes)

script:
sed -i -n '1,900000p' ${reads[0]}
sed -i -n '1,900000p' ${reads[1]}

So my question is: how should I modify the fastq file and truncate it inside the process? My aim is just to carry out the whole pipeline after this process with a smaller fastq file.

Thanks

Upvotes: 2

Views: 377

Answers (2)

Steve
Steve

Reputation: 54562

You might have added something like the following to your nextflow.config:

process {

  shell = [ '/bin/bash', '-euo', 'pipefail' ]
}

You get an error because has had its destination closed early, and wasn't able to (successfully) write a decompressed file. It has no way of knowing that this was intentional, or if something bad happened (e.g. out of disk space, network error, etc).

If you'd rather not make changes to your shell options, you could instead use a tool that reads the entire input stream. This is what your solution does, which is the same as:

zcat test.fastq.gz | awk 'NR<=900000 { print $0 }'

For each input record, AWK tests the current record number (NR) to see if it is less than 900000. If it is, the whole line is printed to stdout. If it's not, try to see if the next line is less 900000. The problem with this solution is that this will be slow if your input FASTQ is really big. So you might be tempted to use:

zcat test.fastq.gz | awk 'NR>900000 { exit }'

But this will work much like the original solution, and will produce an error with your current shell options. A better solution (which lets you keep your current shell options) is to instead use process substitution to avoid the pipe:

head -n 900000 < <(zcat test.fastq.gz)

Your Nextflow code might look like:

nextflow.enable.dsl=2


params.max_lines = 900000

process TRUNCATE_FASTQ {

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    head -n ${params.max_lines} < <(zcat "${fq1}") | gzip > "truncated_${fq1}"
    head -n ${params.max_lines} < <(zcat "${fq2}") | gzip > "truncated_${fq2}"
    """
}

workflow {

    TRUNCATE_FASTQ( Channel.fromFilePairs('*.{1,2}.fastq.gz') )
}

If you have Python, which has native gzip support, you could also use pysam for this. For example:

params.max_records = 225000

process TRUNCATE_FASTQ {

    container 'quay.io/biocontainers/pysam:0.16.0.1--py37hc334e0b_1'

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    #!/usr/bin/env python
    import gzip
    import pysam

    def truncate(fn, num):
        with pysam.FastxFile(fn) as ifh:
            with gzip.open(f'truncated_{fn}', 'wt') as ofh:
                for idx, entry in enumerate(ifh):
                    if idx >= num:
                        break
                    print(str(entry), file=ofh)

    truncate('${fq1}', ${params.max_records})
    truncate('${fq2}', ${params.max_records})
    """
}

Or using Biopython:

params.max_records = 225000

process TRUNCATE_FASTQ {

    container 'quay.io/biocontainers/biopython:1.78'

    input:
    tuple val(sample), path(reads)

    script:
    def (fq1, fq2) = reads

    """
    #!/usr/bin/env python
    import gzip
    from itertools import islice
    from Bio import SeqIO

    def xopen(fn, mode='r'):
        if fn.endswith('.gz'):
            return gzip.open(fn, mode) 
        else:
            return open(fn, mode)

    def truncate(fn, num):
        with xopen(fn, 'rt') as ifh, xopen(f'truncated_{fn}', 'wt') as ofh:
            seqs = islice(SeqIO.parse(ifh, 'fastq'), num)
            SeqIO.write(seqs, ofh, 'fastq')

    truncate('${fq1}', ${params.max_records})
    truncate('${fq2}', ${params.max_records})
    """
}

Upvotes: 3

Cris Tu&#241;&#237;
Cris Tu&#241;&#237;

Reputation: 107

So!

After some testing it seems that as @tripleee said, there was an error signal using head.

I do not fully understant the motives behind this, but this answer found in another question, seems to have solved at least, my problem.

So the final code is something like that:

process TRUNCATE_FASTQ {

input:
set val(sample), val(single_end), path(reads) from ch_reads

output:
path foo into ch_output

script:
zcat ${reads[0]}  | awk '(NR<=900000)' > truncated_file_1
gzip truncated_file_1

Upvotes: 0

Related Questions