Reputation: 28192
I have a large vector of vectors of strings: There are around 50,000 vectors of strings, each of which contains 2-15 strings of length 1-20 characters.
MyScoringOperation
is a function which operates on a vector of strings (the datum) and returns an array of 10100 scores (as Float64s). It takes about 0.01 seconds to run MyScoringOperation
(depending on the length of the datum)
function MyScoringOperation(state:State, datum::Vector{String})
...
score::Vector{Float64} #Size of score = 10000
I have what amounts to a nested loop. The outer loop typically would runs for 500 iterations
data::Vector{Vector{String}} = loaddata()
for ii in 1:500
score_total = zeros(10100)
for datum in data
score_total+=MyScoringOperation(datum)
end
end
On one computer, on a small test case of 3000 (rather than 50,000) this takes 100-300 seconds per outer loop.
I have 3 powerful servers with Julia 3.9 installed (and can get 3 more easily, and then can get hundreds more at the next scale).
I have basic experience with @parallel, however it seems like it is spending a lot of time copying the constant (It more or less hang on the smaller testing case)
That looks like:
data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500
score_total = @parallel(+) for datum in data
MyScoringOperation(state, datum)
end
state = update(state, score_total)
end
My understanding of the way this implementation works with @parallel is that it:
For Each ii
:
data
into a chuck for each workerI would like to remove step 2,
so that instead of sending a chunk of data to each worker,
I just send a range of indexes to each worker, and they look it up from their own copy of data
. or even better, only giving each only their own chunk, and having them reuse it each time (saving on a lot of RAM).
Profiling backs up my belief about the functioning of @parellel. For a similarly scoped problem (with even smaller data), the non-parallel version runs in 0.09seconds, and the parallel runs in And the profiler shows almost all the time is spent 185 seconds. Profiler shows almost 100% of this is spend interacting with network IO.
Upvotes: 4
Views: 418
Reputation: 28192
This should get you started:
function get_chunks(data::Vector, nchunks::Int)
base_len, remainder = divrem(length(data),nchunks)
chunk_len = fill(base_len,nchunks)
chunk_len[1:remainder]+=1 #remained will always be less than nchunks
function _it()
for ii in 1:nchunks
chunk_start = sum(chunk_len[1:ii-1])+1
chunk_end = chunk_start + chunk_len[ii] -1
chunk = data[chunk_start: chunk_end]
produce(chunk)
end
end
Task(_it)
end
function r_chunk_data(data::Vector)
all_chuncks = get_chunks(data, nworkers()) |> collect;
remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
#Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end
function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
total = nothing
#TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe
@sync for rr in rem_results
function gather(rr)
res=fetch(rr)
if total===nothing
total=res
else
total=red_acc(total,res)
end
end
@async gather(rr)
end
total
end
function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
rem_results = map(r_chunks) do rchunk
function do_mapred()
@assert r_chunk.where==myid()
@pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
end
remotecall(r_chunk.where,do_mapred)
end
@pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end
rchunk_data
breaks the data into chunks, (defined by get_chunks
method) and sends those chunks each to a different worker, where they are stored in RemoteRefs.
The RemoteRefs are references to memory on your other proccesses(and potentially computers), that
prechunked_map_reduce
does a variation on a kind of map reduce to have each worker first run map_fun
on each of it's chucks elements, then reduce over all the elements in its chuck using red_acc
(a reduction accumulator function). Finally each worker returns there result which is then combined by reducing them all together using red_acc
this time using the fetch_reduce
so that we can add the first ones completed first.
fetch_reduce
is a nonblocking fetch and reduce operation. I believe it has no raceconditions, though this maybe because of a implementation detail in @async
and @sync
. When julia 0.4 comes out, it is easy enough to put a lock in to make it obviously have no race conditions.
This code isn't really battle hardened. I don;t believe the You also might want to look at making the chuck size tunable, so that you can seen more data to faster workers (if some have better network or faster cpus)
You need to reexpress your code as a map-reduce problem, which doesn't look too hard.
Testing that with:
data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))
Took ~0.03 seconds, when distributed across 8 workers (none of them on the same machine as the launcher)
vs running just locally:
@time reduce(+,map(mean,data))
took ~0.06 seconds.
Upvotes: 4