splaisan
splaisan

Reputation: 923

snakemake running single jobs in parallel from all files in folder

My problem is related to Running parallel instances of a single job/rule on Snakemake but I believe different.

I cannot create a all: rule for it in advance because the folder of input files will be created by a previous rule and depends on the user initial data

pseudocode

I am now at rule3 with Split containing 70 files like Split/file_001.fq Split/file_002.fq .. Split/file_069.fq

Could you please help me creating a rule for pigz to run compress the 70 files in parallel to 70 .gz files

I am running with snakemake -j 24 ZipSplit

config["pigt"] gives 4 threads for each compression job and I give 24 threads to snakemake so I expect 6 parallel compressions but my current rule merges the inputs to one archive in a single job instead of parallelizing !?

Should I build the list of input fully in the rule? how?

# parallel job
files, = glob_wildcards("Split/{x}.fq")

rule ZipSplit:
    input: expand("Split/{x}.fq", x=files)
    threads: config["pigt"]
    shell: 
      """
      pigz -k -p {threads} {input}
      """

I tried to define input directly with

input: glob_wildcards("Split/{x}.fq")

but syntax error occures

# InSilico_PCR Snakefile

import os
import re
from snakemake.remote.HTTP import RemoteProvider as HTTPRemoteProvider

HTTP = HTTPRemoteProvider()

# source config variables
configfile: "config.yaml"


# single job
rule GetRawData:
    input:
      HTTP.remote(os.path.join(config["host"], config["infile"]), keep_local=True, allow_redirects=True)
    output:
      os.path.join("RawData", config["infile"])
    run:
      shell("cp {input} {output}")


# single job
rule SplitFastq:
    input:
      os.path.join("RawData", config["infile"])
    params:
      lines_per_file =  config["lines_per_file"]
    output:
      pfx = os.path.join("Split", config["infile"] + "_")
    shell:
      """
      zcat {input} | split --numeric-suffixes --additional-suffix=.fq -a 3 -l {params.lines_per_file} - {output.pfx}
      """

# parallel job
files, = glob_wildcards("Split/{x}.fq")
rule ZipSplit:
    input: expand("Split/{x}.fq", x=files)
    threads: config["pigt"]
    shell: 
      """
      pigz -k -p {threads} {input}
      """

Upvotes: 1

Views: 730

Answers (1)

dariober
dariober

Reputation: 9062

I think the example below should do it, using checkpoints as suggested by @Maarten-vd-Sande.

However, in your particular case of splitting a big file and compress the output on the fly, you may be better off using the --filter option of split as in

split -a 3 -d -l 4 --filter='gzip -c > $FILE.fastq.gz' bigfile.fastq split/

The snakemake solution, assuming your input file is called bigfile.fastq, split and compress output will be in directory splitting./bigfile/

rule all:
    input:
        expand("{sample}.split.done", sample= ['bigfile']),

checkpoint splitting:
    input:
        "{sample}.fastq"
    output:
        directory("splitting/{sample}")
    shell:
        r"""
        mkdir splitting/{wildcards.sample}
        split -a 3 -d --additional-suffix .fastq -l 4 {input} splitting/{wildcards.sample}/
        """

rule compress:
    input:
        "splitting/{sample}/{i}.fastq",
    output:
        "splitting/{sample}/{i}.fastq.gz",
    shell:
        r"""
        gzip -c {input} > {output}
        """


def aggregate_input(wildcards):
    checkpoint_output = checkpoints.splitting.get(**wildcards).output[0]
    return expand("splitting/{sample}/{i}.fastq.gz",
           sample=wildcards.sample,
           i=glob_wildcards(os.path.join(checkpoint_output, "{i}.fastq")).i)

rule all_done:
    input:
        aggregate_input
    output:
        touch("{sample}.split.done")

Upvotes: 1

Related Questions