Reputation: 3711
My current architecture is that at the start of my Snakefile I have a long running function somefunc
which helps decide the "input" to rule all
. I realized when I was running the workflow with slurm that somefunc
is being executed by each job. Is there some variable I can access that defines whether the code is a submitted job or whether it is the main process:
if not snakemake.submitted_job:
config['layout'] = somefunc()
...
Upvotes: 1
Views: 167
Reputation: 3711
As discussed with @dariober it seems the cleanest to check whether the (hidden) snakemake directory has locks since they seem not to be generated until the first rule starts (assuming you are not using the --nolock argument).
import os
locked = len(os.listdir(".snakemake/locks")) > 0
However this results in a problem in my case:
import time
import os
def longfunc():
time.sleep(10)
return range(5)
locked = len(os.listdir(".snakemake/locks")) > 0
if not locked:
info = longfunc()
rule all:
input:
expand("test_{sample}", sample=info)
rule test:
output:
touch("test_{sample}")
run:
"""
sleep 1
"""
Somehow snakemake lets each rule reinterpret the complete snakefile, with the issue that all the jobs will complain that 'info is not defined'. For me it was easiest to store the results and load them for each job (pickle.dump and pickle.load).
Upvotes: 1
Reputation: 9062
A solution which I don't really recommend is to make somefunc
write the list of inputs to a tmp file so that slurm jobs will read this tmp file rather than reconstructing the list from scratch. The tmp file is created by whatever job is executed first so the long-running part is done only once.
At the end of the workflow delete the tmp file so that later executions will start fresh with new input.
Here's a sketch:
def somefunc():
try:
all_output = open('tmp.txt').readlines()
all_output = [x.strip() for x in all_output]
print('List of input files read from tmp.txt')
except:
all_output = ['file1.txt', 'file2.txt'] # Long running part
with open('tmp.txt', 'w') as fout:
for x in all_output:
fout.write(x + '\n')
print('List of input files created and written to tmp.txt')
return all_output
all_output = somefunc()
rule all:
input:
all_output,
rule one:
output:
all_output,
shell:
r"""
touch {output}
"""
onsuccess:
os.remove('tmp.txt')
onerror:
os.remove('tmp.txt')
Since jobs will be submitted in parallel, you should make sure that only one job writes tmp.txt
and the others read it. I think the try/except above will do it but I'm not 100% sure. (Probably you want to use some better filename than tmp.txt
, see the module tempfile. see also the module atexit) for exit handlers)
Upvotes: 1