Juergen
Juergen

Reputation: 861

dask: shared memory in parallel model

I've read the dask documentation, blogs and SO, but I'm still not 100% clear on how to do it. My use case:

In summary:

Possible pitfalls / issues:

The most efficient solution seems to be, if we were able to load the ref-data in memory only once, make it available read-only to multiple other processes processing the events

Scale out to multiple computers by loading the ref-data in each computer. Push filenames to the computers for execution.

Any idea how to achieve this?

Thanks a lot for your help

Upvotes: 5

Views: 3137

Answers (3)

Wall-E
Wall-E

Reputation: 713

I have also came across the similar issue of running embarissingly parallel jobs that were all fetching data in the same lookup "reference" table (or any big-memory read-only variable needed by each instance of the parallel process. As long as you stay in an environment which follow the "copy-on-write" semantics (e.g. linux), placing the lookup table in the global scope always worked very efficiently as explained nicely here: Shared-memory objects in multiprocessing

Here is a simple parallel workflow:

from multiprocessing import Pool

# Load your reference data, do that only once 
# here in the parent process
my_ref_lookup = load_ref_data(your_data_file)

def your_parallel_function(my_file_path):
    my_new_data = load_data(my_file_path)
    # process my_new_data with some lookup in my_ref_lookup 
    # which is known from the parent process. 

    processed_data = do_stuff(my_new_data)

    # you could here write something on disk
    # and/or return the processed_data

    return processed_data

with Pool(processes = 5) as Pool:
   list_of_result = Pool.map(your_parallel_function, your_list_of_file_paths)

Here the execution of your_parallel_function will execute in parallel over e.g. 5 workers, fetching 5 files inside your_list_of_file_paths at a time and all child processes will have access to my_ref_lookup without having to copy them.

After some time spent with Dask and bag collections, I never found a similar or simpler behavior than this. In my attempts at using Dask, the read-only variable shared this way in the global scope ended up being copied by as many workers which needed it, which exploded the memory and made my kernel crash. I have never seen this case handled in any of the Dask documention. The only remotely related reference to this in the Dask documentation is about avoiding global state: https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state but this shows the case of the shared variable being modified the delayed function, which is different from the current issue of just sharing "read-only" data.

Upvotes: 3

mdurant
mdurant

Reputation: 28683

Some things you can think about

  • each dask worker process can have any number of threads. Sharing data between threads does not require copying, but sharing between processes does; so you should experiment with the process/thread mix to find the optimal for you

  • it is generally better to load data in the workers rather than pass from the client, even though replicating amongst the processes is fairly efficient. If you have the memory to persist the ref-data for every worker, that is obviously best, although Dask tries its best to account for common intermediate dependencies for tasks.

  • every task introduces some overhead, and may result in intermediates being moved from one machine to the other. Although some linear chains of processes may be fused at optimisation time, you are probably better writing a function that calls your stages in sequence from a function, and call that function as a single task, once for each part of your data.

Example

f = client.submit(read_function, ref_filename)
out = client.map(process_function, list_of_inputs, ref=f)

where process_function in this example takes one input (which may be a tuple) and ref= optional input which is the loaded ref data. Dask will replicate the reference data to workers as required.

Upvotes: 1

Juergen
Juergen

Reputation: 861

I've found a blog post about the (python) Ray framework. Even though Ray's business purpose is very different, they faced the same core requirements: read-only shared-memory dataframes leveraged by many parallel processes. They are describing and explaining why they settled on Apache Arrow and pyarrow. Sounds interesting and we'll give it a try for our use case.

Upvotes: 0

Related Questions