JLDiaz
JLDiaz

Reputation: 1453

Snakemake. Set of experiments to be run sequentially on different resources

Excuse the length of the question - the problem is a bit complex and I think it is interesting, so I wanted to illustrate it clearly and in full detail, providing working code that can be tested, including my (not very fruitful) attempts.

The problem

Assume that I have a set of experiments to do on different Virtual Machines with different characteristics, for example, to determine the runtime of different workloads. In each machine there is a set of experiments depending on other parameters, which vary the workload, as for example the name of the application to run, the size of the problem to solve, etc.

To simplify, assume only the following parameters that define each experiment:

  1. The type of the virtual machine (eg: small, medium, large)
  2. The name of the application to run (eg: eratostenes, fibonacci)
  3. The size of the problem to solve (eg: 100, 500, 1000)

With these parameters, the number of experiments to run is 3 x 2 x 3 = 18

As a result I want 18 files, each one containing different statistics about the experiment. The names of the files will use the pattern VM_{type}_APP_{app_name}_SIZE_{size}.data as for example "VM_small_APP_eratostenes_SIZE_100.data" and so on.

Naive approach

So far, so good. I can easily write a Snakefile that perform these tasks:

vm_types = ["small", "medium", "large"]
applications = ["eratostenes", "fibonacci"]
sizes = [100, 500, 1000]
all_files = expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", type=vm_types, app_name=applications, size=sizes)

def perform_experiment(params):
    print("Faking experiment with params", params)
    with open("VM_{type}_APP_{app_name}_SIZE_{size}.data".format(**params), "w") as f:
        f.write("Dummy data")

rule all:
    input: all_files

rule perform_experiment:
    output: "VM_{type}_APP_{app_name}_SIZE_{size}.data"
    run:
        perform_experiment(wildcards)

In fact the problem is slightly more complex, because the VMs required for the experiments have to be created and started before the experiment can take place. So I need to add extra tasks for starting up the virtual machine, and include dependencies among tasks. For example:

vm_types = ["small", "medium", "large"]
applications = ["eratostenes", "fibonacci"]
sizes = [100, 500, 1000]
all_files = expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", type=vm_types, app_name=applications, size=sizes)

def perform_experiment(params):
    print("Faking experiment with params", params)
    with open("VM_{type}_APP_{app_name}_SIZE_{size}.data".format(**params), "w") as f:
        f.write("Dummy data")

def start_vm(t):
    print("Starting VM", t)

rule all:
    input: all_files

rule perform_experiment:
    input: "VM_{type}_start.done"
    output: "VM_{type}_APP_{app_name}_SIZE_{size}.data"
    run:
        perform_experiment(wildcards)

rule start_vm:
    output: touch("VM_{type}_start.done")
    run:
        start_vm(wildcards.type)

The real problem

The problem is that the DAG of this workflow has no dependencies between VMs, so all experiments could be done in parallel, in principle. Even if I enforce --jobs 1 to avoid parallelism, since the experiments are independent, snakemake can choose to start by any experiment in any of the VM types, and then perform the next experiment in a different VM type, etc.

This is not what I want. Asssume that I don't have the resources to run several VM in parallel, so I want to perform the tests in each VM sequentially. In addition, starting up and shutting down a machine takes time, so I want to leverage the fact that one VM is started to perform all required experiments inside that machine. That is:

For each virtual machine type:
  - Set up that vm
  - Perform all experiments that must be done in that virtual machine
  - Stop that virtual machine

How would be the best way to accomplish this?

My attempt

I thought about introducing additional tasks to stop the VM, which should also produce a file like "VM_small_stop.done", and initiate the start-up of a new VM type depending on the stopping of the previous one. In addition, the "stop_vm" task can only be run after all experiments in that VM are done, so I need to introduce dependencies between "stop_vm" and "perform_experiment".

Also, I need to "manually" delete the file VM_{type}_start.done when a machine is stopped, as part of the code which stops it (there is way to specify as output the absence of a file so that Snakemake deletes it? Kind of touch(filename) but in reverse... delete(filename)).

And I also need to add to the rule all a dependency on the last machine stop, or othervwise that last machine will not be stopped.

This quickly makes the Snake file very complex, as for example:

vm_types = ["small", "medium", "large"]
applications = ["eratostenes", "fibonacci"]
sizes = [100, 500, 1000]
all_files = expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", type=vm_types, app_name=applications, size=sizes)

def perform_experiment(params):
    print("Faking experiment with params", params)
    with open("VM_{type}_APP_{app_name}_SIZE_{size}.data".format(**params), "w") as f:
        f.write("Dummy data")

def start_vm(t):
    print("Starting VM", t)

import os
def stop_vm(t):
    print("Stopping VM", t)
    os.unlink("VM_{}_start.done".format(t))

def previous_vm_type(wildcards):
    if wildcards.type == "small": return []
    return "VM_{}_stop.done".format(vm_types[vm_types.index(wildcards.type)-1])

rule all:
    input: all_files, "VM_{}_stop.done".format(vm_types[-1])

rule perform_experiment:
    input: "VM_{type}_start.done"
    output: "VM_{type}_APP_{app_name}_SIZE_{size}.data"
    run:
        perform_experiment(wildcards)

rule start_vm:
    input: previous_vm_type
    output: touch("VM_{type}_start.done")
    run:
        start_vm(wildcards.type)

rule stop_vm:
    input: expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", app_name=applications, size=sizes, allow_missing=True)
    output: touch("VM_{type}_stop.done")
    run:
        stop_vm(wildcards.type)

This kind of works. It produces the following DAG which looks ok:

DAG

But nevertheless it has some problems:

  1. The approach looks clumsy. It must be a better way!
  2. If I want to re-run one of the experiments, say for example VM_large_APP_eratostenes_SIZE_500.data, how shoud I proceed?

The question

So my "solution" defeates the purpose of using snakemake in the first place. If I want to re-run only one experiment, but this Snakefile will cause to run all of them anyway, better I would have written a sequential python script.

So, what would be a good way to address this problem using Snakemake?

Upvotes: 1

Views: 434

Answers (1)

Jianyu
Jianyu

Reputation: 371

I think I might need to include some codes to show my idea, this is similar to your posted design. I deleted the start_vm rule to let each perform_experiment able to start vm by themselves, and if you let stop_vm become the dependency for each perform_experiment (Also add ancient tag to avoid refreshing it leading to re-run all following experiments), it would still first run all experiments on "small" machine, then "medium", then "large".

vm_types = ["small", "medium", "large"]
applications = ["eratostenes", "fibonacci"]
sizes = [100, 500, 1000]
all_files = expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", type=vm_types, app_name=applications, size=sizes)

def perform_experiment(params):
    print("Faking experiment with params", params)
    with open("VM_{type}_APP_{app_name}_SIZE_{size}.data".format(**params), "w") as f:
        f.write("Dummy data")

import os
def stop_vm(t):
    print("Stopping VM", t)

def previous_vm_type(wildcards):
    if wildcards.type == "small": return []
    return "VM_{}_stop.done".format(vm_types[vm_types.index(wildcards.type)-1])

rule all:
    input: all_files, "VM_{}_stop.done".format(vm_types[-1])

rule perform_experiment:
    input: lambda wildcards: ancient(previous_vm_type(wildcards))
    output: "VM_{type}_APP_{app_name}_SIZE_{size}.data"
    run:
        if True:  # check vm status, start vm if it's not running
            print("Starting VM", wildcards.type)
        perform_experiment(wildcards)

rule stop_vm:
    input: expand("VM_{type}_APP_{app_name}_SIZE_{size}.data", app_name=applications, size=sizes, allow_missing=True)
    output: touch("VM_{type}_stop.done")
    run:
        stop_vm(wildcards.type)

The dag would be also quite familiar

enter image description here

What would happen if missing a subset of experiments, VM_medium_APP_eratostenes_SIZE_1000.data and VM_large_APP_eratostenes_SIZE_1000.data were deleted

$ snakemake -np
Building DAG of jobs...
Job stats:
job                   count    min threads    max threads
------------------  -------  -------------  -------------
all                       1              1              1
perform_experiment        2              1              1
stop_vm                   2              1              1
total                     5              1              1


[Wed Dec 22 12:50:27 2021]
rule perform_experiment:
    input: VM_small_stop.done
    output: VM_medium_APP_eratostenes_SIZE_1000.data
    jobid: 10
    wildcards: type=medium, app_name=eratostenes, size=1000
    resources: tmpdir=/tmp

[Wed Dec 22 12:50:27 2021]
rule stop_vm:
    input: VM_medium_APP_eratostenes_SIZE_100.data, VM_medium_APP_eratostenes_SIZE_500.data, VM_medium_APP_eratostenes_SIZE_1000.data, VM_medium_APP_fibonacci_SIZE_100.data, VM_medium_APP_fibonacci_SIZE_500.data, VM_medium_APP_fibonacci_SIZE_1000.data
    output: VM_medium_stop.done
    jobid: 15
    wildcards: type=medium
    resources: tmpdir=/tmp

[Wed Dec 22 12:50:27 2021]
rule perform_experiment:
    input: VM_medium_stop.done
    output: VM_large_APP_eratostenes_SIZE_1000.data
    jobid: 17
    wildcards: type=large, app_name=eratostenes, size=1000
    resources: tmpdir=/tmp

[Wed Dec 22 12:50:27 2021]
rule stop_vm:
    input: VM_large_APP_eratostenes_SIZE_100.data, VM_large_APP_eratostenes_SIZE_500.data, VM_large_APP_eratostenes_SIZE_1000.data, VM_large_APP_fibonacci_SIZE_100.data, VM_large_APP_fibonacci_SIZE_500.data, VM_large_APP_fibonacci_SIZE_1000.data
    output: VM_large_stop.done
    jobid: 21
    wildcards: type=large
    resources: tmpdir=/tmp

[Wed Dec 22 12:50:27 2021]
localrule all:
    input: VM_small_APP_eratostenes_SIZE_100.data, VM_small_APP_eratostenes_SIZE_500.data, VM_small_APP_eratostenes_SIZE_1000.data, VM_small_APP_fibonacci_SIZE_100.data, VM_small_APP_fibonacci_SIZE_500.data, VM_small_APP_fibonacci_SIZE_1000.data, VM_medium_APP_eratostenes_SIZE_100.data, VM_medium_APP_eratostenes_SIZE_500.data, VM_medium_APP_eratostenes_SIZE_1000.data, VM_medium_APP_fibonacci_SIZE_100.data, VM_medium_APP_fibonacci_SIZE_500.data, VM_medium_APP_fibonacci_SIZE_1000.data, VM_large_APP_eratostenes_SIZE_100.data, VM_large_APP_eratostenes_SIZE_500.data, VM_large_APP_eratostenes_SIZE_1000.data, VM_large_APP_fibonacci_SIZE_100.data, VM_large_APP_fibonacci_SIZE_500.data, VM_large_APP_fibonacci_SIZE_1000.data, VM_large_stop.done
    jobid: 0
    resources: tmpdir=/tmp

Job stats:
job                   count    min threads    max threads
------------------  -------  -------------  -------------
all                       1              1              1
perform_experiment        2              1              1
stop_vm                   2              1              1
total                     5              1              1

This was a dry-run (flag -n). The order of jobs does not reflect the order of execution.

Upvotes: 1

Related Questions