Reputation: 1
I'm developing a Snakemake workflow to efficiently process genomic data by splitting a large exon coordinate file (a parsed GTF/GFF3) into smaller batches, performing parallel processing on these batches to extract intron coordinates, and then merging the results.
My goal is to fully utilize the server's resources to enhance processing speed. For context, my project requires the processing of many genomes and extracting intron coordinate data is just the first step. The full workflow will utilize multiple similar checkpoints, so it's important for me to figure this out. However, I'm encountering difficulties implementing parallel batching using Snakemake.
I've attempted to structure my workflow using a checkpoint that splits files into batches, as outlined below:
NUM_BATCHES = config.get('batch_size', 10000)
PATH = config['path']
rule all:
input:
os.path.join(PATH, "intron.bed")
# Checkpoint for splitting exon.info file into batches
checkpoint split_exon_batches:
input:
exon_info = os.path.join(PATH, "exon.info")
output:
bedfile_dir = directory(os.path.join(PATH, "BedFiles"))
shell:
"python3 determine_bedfile_batches.py {input.exon_info} {NUM_BATCHES}"
This effectively produces exon_1.info, exon_2.info, etc. which then need to be passed to a script that generates intron bedfiles. Subsequently, the generated intron bedfiles are merged.
# Function to aggregate the paths of exon batch files created by split_exon_batches
def aggregate_exon_batches(wildcards):
checkpoint_output = checkpoints.split_exon_batches.get(**wildcards).output[0]
exon_files = glob_wildcards(os.path.join(checkpoint_output, "exon_{batch_id}.info")).batch_id
return expand(os.path.join(checkpoint_output, "exon_{batch_id}.info"), batch_id=exon_files)
# Function to aggregate the paths of intron files created by generate_bedfiles
def aggregate_intron_bedfiles(wildcards):
checkpoint_output = checkpoints.split_exon_batches.get(**wildcards).output[0]
intron_files = glob_wildcards(os.path.join(checkpoint_output, "intron_{batch_id}.bed")).batch_id
return expand(os.path.join(checkpoint_output, "intron_{batch_id}.bed"), batch_id=intron_files)
# Rule to process exon batches and generate intron bedfiles
rule generate_bedfiles:
input:
aggregate_exon_batches
output:
touch(os.path.join(PATH, "bedfiles_generated"))
shell:
"python3 generate_splice_sites.py {input}"
# Rule to merge intron bedfiles
rule merge_bedfiles:
input:
aggregate_intron_bedfiles,
os.path.join(PATH, "bedfiles_generated")
output:
os.path.join(PATH, "intron.bed"),
shell:
"""
cat {input} > {output}
"""
I can't seem to figure out how to ensure the files returned by aggreate_exon_batches are processed in parallel by Snakemake. The checkpoint itself runs just fine and the aggregate_exon_batches function finds the produced files, but I can't seem to figure out how to get each batch file to run in parallel. Current error message:
Error in rule generate_bedfiles:
jobid: 3
input: PATH/BedFiles/exon_3.info, PATH/BedFiles/exon_2.info, PATH/BedFiles/exon_1.info
output: PATH/bedfiles_generated
shell:
python3 generate_splice_sites.py PATH/BedFiles/exon_3.info PATH/BedFiles/exon_2.info PATH/BedFiles/exon_1.info
(one of the commands exited with non-zero exit code; note that snakemake uses bash strict mode!)
Am I on the right track? If so, what am I missing to get this to run appropriately? Is batching (as I've described) even possible with Snakemake? I'd greatly appreciate any guidance!
Upvotes: 0
Views: 35