Frames Catherine White
Frames Catherine White

Reputation: 28192

Operating in parallel on a large constant datastructure in Julia

I have a large vector of vectors of strings: There are around 50,000 vectors of strings, each of which contains 2-15 strings of length 1-20 characters.

MyScoringOperation is a function which operates on a vector of strings (the datum) and returns an array of 10100 scores (as Float64s). It takes about 0.01 seconds to run MyScoringOperation (depending on the length of the datum)

function MyScoringOperation(state:State, datum::Vector{String})
      ...
      score::Vector{Float64} #Size of score = 10000

I have what amounts to a nested loop. The outer loop typically would runs for 500 iterations

data::Vector{Vector{String}} = loaddata()
for ii in 1:500 
    score_total = zeros(10100)
    for datum in data
         score_total+=MyScoringOperation(datum)
    end
end

On one computer, on a small test case of 3000 (rather than 50,000) this takes 100-300 seconds per outer loop.

I have 3 powerful servers with Julia 3.9 installed (and can get 3 more easily, and then can get hundreds more at the next scale).


I have basic experience with @parallel, however it seems like it is spending a lot of time copying the constant (It more or less hang on the smaller testing case)

That looks like:

data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500 

    score_total = @parallel(+) for datum in data
         MyScoringOperation(state, datum)
    end
    state = update(state, score_total)
end

My understanding of the way this implementation works with @parallel is that it:

For Each ii:

  1. partitions data into a chuck for each worker
  2. sends that chuck to each worker
  3. works all process there chunks
  4. main procedure sums the results as they arrive.

I would like to remove step 2, so that instead of sending a chunk of data to each worker, I just send a range of indexes to each worker, and they look it up from their own copy of data. or even better, only giving each only their own chunk, and having them reuse it each time (saving on a lot of RAM).


Profiling backs up my belief about the functioning of @parellel. For a similarly scoped problem (with even smaller data), the non-parallel version runs in 0.09seconds, and the parallel runs in And the profiler shows almost all the time is spent 185 seconds. Profiler shows almost 100% of this is spend interacting with network IO.

Upvotes: 4

Views: 418

Answers (1)

Frames Catherine White
Frames Catherine White

Reputation: 28192

This should get you started:

function get_chunks(data::Vector, nchunks::Int)
    base_len, remainder = divrem(length(data),nchunks)
    chunk_len = fill(base_len,nchunks)
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks
    function _it() 
        for ii in 1:nchunks
            chunk_start = sum(chunk_len[1:ii-1])+1
            chunk_end = chunk_start + chunk_len[ii] -1
            chunk = data[chunk_start: chunk_end]
            produce(chunk)
        end
    end
    Task(_it)
end

function r_chunk_data(data::Vector)
    all_chuncks = get_chunks(data, nworkers()) |> collect;
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end



function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
    total = nothing 
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe 
    @sync for rr in rem_results
        function gather(rr)
            res=fetch(rr)
            if total===nothing
                total=res
            else 
                total=red_acc(total,res)
            end
        end
        @async gather(rr)
    end
    total
end

function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
    rem_results = map(r_chunks) do rchunk
        function do_mapred()
            @assert r_chunk.where==myid()
            @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
        end
        remotecall(r_chunk.where,do_mapred)
    end
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end

rchunk_data breaks the data into chunks, (defined by get_chunks method) and sends those chunks each to a different worker, where they are stored in RemoteRefs. The RemoteRefs are references to memory on your other proccesses(and potentially computers), that

prechunked_map_reduce does a variation on a kind of map reduce to have each worker first run map_fun on each of it's chucks elements, then reduce over all the elements in its chuck using red_acc (a reduction accumulator function). Finally each worker returns there result which is then combined by reducing them all together using red_acc this time using the fetch_reduce so that we can add the first ones completed first.

fetch_reduce is a nonblocking fetch and reduce operation. I believe it has no raceconditions, though this maybe because of a implementation detail in @async and @sync. When julia 0.4 comes out, it is easy enough to put a lock in to make it obviously have no race conditions.

This code isn't really battle hardened. I don;t believe the You also might want to look at making the chuck size tunable, so that you can seen more data to faster workers (if some have better network or faster cpus)

You need to reexpress your code as a map-reduce problem, which doesn't look too hard.


Testing that with:

data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))

Took ~0.03 seconds, when distributed across 8 workers (none of them on the same machine as the launcher)

vs running just locally:

@time reduce(+,map(mean,data))

took ~0.06 seconds.

Upvotes: 4

Related Questions