Maarten-vd-Sande
Maarten-vd-Sande

Reputation: 3711

Snakemake: variable that defines whether process is submitted cluster job or the snakefile

My current architecture is that at the start of my Snakefile I have a long running function somefunc which helps decide the "input" to rule all. I realized when I was running the workflow with slurm that somefunc is being executed by each job. Is there some variable I can access that defines whether the code is a submitted job or whether it is the main process:

if not snakemake.submitted_job:
    config['layout'] = somefunc()

...

Upvotes: 1

Views: 167

Answers (2)

Maarten-vd-Sande
Maarten-vd-Sande

Reputation: 3711

As discussed with @dariober it seems the cleanest to check whether the (hidden) snakemake directory has locks since they seem not to be generated until the first rule starts (assuming you are not using the --nolock argument).

import os
locked = len(os.listdir(".snakemake/locks")) > 0

However this results in a problem in my case:

import time
import os


def longfunc():
    time.sleep(10)
    return range(5)

locked = len(os.listdir(".snakemake/locks")) > 0
if not locked:
    info = longfunc()


rule all:
    input:
        expand("test_{sample}", sample=info)



rule test:
    output:
        touch("test_{sample}")
    run:
        """
        sleep 1
        """

Somehow snakemake lets each rule reinterpret the complete snakefile, with the issue that all the jobs will complain that 'info is not defined'. For me it was easiest to store the results and load them for each job (pickle.dump and pickle.load).

Upvotes: 1

dariober
dariober

Reputation: 9062

A solution which I don't really recommend is to make somefunc write the list of inputs to a tmp file so that slurm jobs will read this tmp file rather than reconstructing the list from scratch. The tmp file is created by whatever job is executed first so the long-running part is done only once.

At the end of the workflow delete the tmp file so that later executions will start fresh with new input.

Here's a sketch:

def somefunc():
    try:
        all_output = open('tmp.txt').readlines()
        all_output = [x.strip() for x in all_output]
        print('List of input files read from tmp.txt')
    except:
        all_output = ['file1.txt', 'file2.txt'] # Long running part
        with open('tmp.txt', 'w') as fout:
            for x in all_output:
                fout.write(x + '\n')
        print('List of input files created and written to tmp.txt')
    return all_output

all_output = somefunc()

rule all:
    input:
        all_output,

rule one:
    output:
        all_output,
    shell:
        r"""
        touch {output}
        """

onsuccess:
    os.remove('tmp.txt')
onerror:
    os.remove('tmp.txt')

Since jobs will be submitted in parallel, you should make sure that only one job writes tmp.txt and the others read it. I think the try/except above will do it but I'm not 100% sure. (Probably you want to use some better filename than tmp.txt, see the module tempfile. see also the module atexit) for exit handlers)

Upvotes: 1

Related Questions