Reputation: 2131
I have a large matrix data
that I want to "organize" in a certain way. The matrix has 5 columns and about 2 million rows. The first 4 columns are characteristics of each observation (these are integers) and the last column is the outcome variable I'm interested in (this contains real numbers). I want to organize this matrix in an Array of Arrays. Since data
is very large, I'm trying to parallelize this operation:
addprocs(3)
@everywhere data = readcsv("datalocation", Int)
@everywhere const Z = 65
@everywhere const z = 16
@everywhere const Y = 16
@everywhere const y = 10
@everywhere const arr = Array{Vector}(Z-z+1,Y-y+1,Z-z+1,Y-y+1)
@parallel (vcat) for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = data[(data[:,1].==a1) & (data[:,2].==e1) & (data[:,3].==a2) & (data[:,4].==e2), end]
end
However I get an error when I try to run the for loop:
Error: syntax: invalid assignment location
After the loop is finished, I would like to have arr
available to all processors. What am I doing wrong?
EDIT:
The input matrix data
looks like this (rows in no particular order):
16 10 16 10 100
16 10 16 11 200
20 12 21 13 500
16 10 16 10 300
20 12 21 13 500
Notice that some rows can be repeated, and some others will have the same "key" but a different fifth column.
The output I want looks like this (notice how I'm using the dimensions of arr
as "keys" for a "dictionary":
arr[16-z+1, 10-y+1, 16-z+1, 10-y+1] = [100, 300]
arr[16-z+1, 10-y+1, 16-z+1, 11-y+1] = [200]
arr[20-z+1, 12-y+1, 21-z+1, 13-y+1] = [500, 500]
That is, the element of arr
at index (16-z+1, 10-y+1, 16-z+1, 10-y+1)
is the vector [100, 300]
. I don't care about the ordering of the rows or the ordering of the last column of vectors.
Upvotes: 2
Views: 116
Reputation: 10980
Does this work for you? I tried to simulate your data by repeating the snippet that you gave of it 1000 times. It's not as elegant as I would have wanted and in particular, I couldn't quite get the remotecall_fetch()
working like I wanted (even when wrapping it with @async
) so I had to split the calling and the fetching into two steps. Let me know though how this seems.
addprocs(n)
@everywhere begin
if myid() != 1
multiplier = 10^3;
Data = readdlm("/path/to/Input.txt")
global data = kron(Data,ones(multiplier));
println(size(data))
end
end
@everywhere begin
function Select_Data(a1, e1, a2, e2, data=data)
return data[(data[:,1].==a1) & (data[:,2].==e1) & (data[:,3].==a2) & (data[:,4].==e2), end]
end
end
n_workers = nworkers()
function next_pid(pid, n_workers)
if pid <= n_workers
return pid + 1
else
return 2
end
end
const arr = Array{Any}(Z-z+1,Y-y+1,Z-z+1,Y-y+1);
println("Beginning Processing Work")
@sync begin
pid = 2
for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
pid = next_pid(pid, n_workers)
arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = remotecall(pid, Select_Data, a1, e1, a2, e2)
end
end
println("Retrieving Completed Jobs")
@sync begin
pid = 2
for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = fetch(arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1])
end
end
Upvotes: 1
Reputation: 10980
Note: I initially misinterpreted your question. I had thought that you were trying to split the data amongst your workers, but I now see that isn't quite what you were after. I wrote up some simplified examples of ways that can be accomplished. I'll leave them up as a response in case anyone in the future finds them useful.
Get started:
writedlm("path/to/data.csv", rand(100,10), ',')
addprocs(4)
Option 1:
function sendto(p::Int; args...)
for (nm, val) in args
@spawnat(p, eval(Main, Expr(:(=), nm, val)))
end
end
Data = readcsv("/path/to/data.csv")
for (idx, pid) in enumerate(workers())
Start = (idx-1)*25 + 1
End = Start + 24
sendto(pid, Data = Data[Start:End,])
end
Option 2:
@everywhere begin
if myid() != 1
Start = (myid()-2)*25 + 1
End = Start + 24
println(Start)
println(End)
Data = readcsv("path/to/data.csv")[Start:End,:]
end
end
# verify everything looks right for what got sent
@everywhere if myid()!= 1 println(typeof(Data)) end
@everywhere if myid()!= 1 println(size(Data)) end
Option 3:
for (idx, pid) in enumerate(workers())
Start = (idx-1)*25 + 1
End = Start + 24
sendto(pid, Start = Start, End = End)
end
@everywhere if myid()!= 1 Data = readcsv("path/to/data.csv")[Start:End,:] end
Upvotes: 1