Reputation: 923
My problem is related to Running parallel instances of a single job/rule on Snakemake but I believe different.
I cannot create a all: rule for it in advance because the folder of input files will be created by a previous rule and depends on the user initial data
pseudocode
rule1: get a big file (OK)
rule2: split the file in parts in Split folder (OK)
rule3: run a program on each file created in Split
I am now at rule3 with Split containing 70 files like Split/file_001.fq Split/file_002.fq .. Split/file_069.fq
Could you please help me creating a rule for pigz to run compress the 70 files in parallel to 70 .gz files
I am running with snakemake -j 24 ZipSplit
config["pigt"] gives 4 threads for each compression job and I give 24 threads to snakemake so I expect 6 parallel compressions but my current rule merges the inputs to one archive in a single job instead of parallelizing !?
Should I build the list of input fully in the rule? how?
# parallel job
files, = glob_wildcards("Split/{x}.fq")
rule ZipSplit:
input: expand("Split/{x}.fq", x=files)
threads: config["pigt"]
shell:
"""
pigz -k -p {threads} {input}
"""
I tried to define input directly with
input: glob_wildcards("Split/{x}.fq")
but syntax error occures
# InSilico_PCR Snakefile
import os
import re
from snakemake.remote.HTTP import RemoteProvider as HTTPRemoteProvider
HTTP = HTTPRemoteProvider()
# source config variables
configfile: "config.yaml"
# single job
rule GetRawData:
input:
HTTP.remote(os.path.join(config["host"], config["infile"]), keep_local=True, allow_redirects=True)
output:
os.path.join("RawData", config["infile"])
run:
shell("cp {input} {output}")
# single job
rule SplitFastq:
input:
os.path.join("RawData", config["infile"])
params:
lines_per_file = config["lines_per_file"]
output:
pfx = os.path.join("Split", config["infile"] + "_")
shell:
"""
zcat {input} | split --numeric-suffixes --additional-suffix=.fq -a 3 -l {params.lines_per_file} - {output.pfx}
"""
# parallel job
files, = glob_wildcards("Split/{x}.fq")
rule ZipSplit:
input: expand("Split/{x}.fq", x=files)
threads: config["pigt"]
shell:
"""
pigz -k -p {threads} {input}
"""
Upvotes: 1
Views: 730
Reputation: 9062
I think the example below should do it, using checkpoints as suggested by @Maarten-vd-Sande.
However, in your particular case of splitting a big file and compress the output on the fly, you may be better off using the --filter
option of split
as in
split -a 3 -d -l 4 --filter='gzip -c > $FILE.fastq.gz' bigfile.fastq split/
The snakemake solution, assuming your input file is called bigfile.fastq
, split and compress output will be in directory splitting./bigfile/
rule all:
input:
expand("{sample}.split.done", sample= ['bigfile']),
checkpoint splitting:
input:
"{sample}.fastq"
output:
directory("splitting/{sample}")
shell:
r"""
mkdir splitting/{wildcards.sample}
split -a 3 -d --additional-suffix .fastq -l 4 {input} splitting/{wildcards.sample}/
"""
rule compress:
input:
"splitting/{sample}/{i}.fastq",
output:
"splitting/{sample}/{i}.fastq.gz",
shell:
r"""
gzip -c {input} > {output}
"""
def aggregate_input(wildcards):
checkpoint_output = checkpoints.splitting.get(**wildcards).output[0]
return expand("splitting/{sample}/{i}.fastq.gz",
sample=wildcards.sample,
i=glob_wildcards(os.path.join(checkpoint_output, "{i}.fastq")).i)
rule all_done:
input:
aggregate_input
output:
touch("{sample}.split.done")
Upvotes: 1