Reputation: 1273
First my question:
Now the details:
I have this program:
data = DataFrames.readtable("...") # a big baby (~100MB)
filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
filtered_data = @parallel vcat for fct in filter_functions
fct(data)::DataFrame
end
It works nice functionality wise, but each parallel call to fct(data) on another worker copies the whole data frame, making everything painfully slow.
Ideally, I would like to load the data once, and always use each on each worker the pre-loaded data. I came up with this code to do so:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
@everywhere for i in 1:length(filter_functions)
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
end
# How to vcat all the filtered_data_temp ?
end
But now I have another problem: I cannot figure out how to vcat() all the filtered_data_temp onto a variable in the worker with myid()==1.
I would very much appreciate any insight.
Note: I am aware of Operating in parallel on a large constant datastructure in Julia. Yet, I don't believe it applies to my problem because all my filter_functions do operate on the array as a whole.
Upvotes: 9
Views: 1452
Reputation: 1273
After all, I found over there the solution to my question: Julia: How to copy data to another processor in Julia.
Especially, it introduces the following primitive in order to retrieve a variable from another process:
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
Below is how I am using it:
@everywhere data = DataFrames.readtable("...") # a big baby (~100MB)
@everywhere filter_functions = [ fct1, fct2, fct3 ... ] # (x::DataFrame) -> y::DataFrame
# Executes the filter functions
@everywhere for i in 1:length(filter_functions)
local_results = ... # some type
if (myid()-1) % nworkers()
fct = filter_functions[i]
filtered_data_temp = fct(data)
local_results = vcat(local_results, filtered_data_temp)
end
# How to vcat all the filtered_data_temp ?
end
# Concatenate all the local results
all_results = ... # some type
for wid in 1:workers()
worker_local_results = getfrom(wid, :local_results)
all_results = vcat(all_results,worker_local_results)
end
Upvotes: 4
Reputation: 2718
You might want to look into/load your data into Distributed Arrays
EDIT: Probably something like this:
data = DataFrames.readtable("...")
dfiltered_data = distribute(data) #distributes data among processes automagically
filter_functions = [ fct1, fct2, fct3 ... ]
for fct in filter_functions
dfiltered_data = fct(dfiltered_data)::DataFrame
end
You can also check the unit tests for more examples
Upvotes: 10