amrods
amrods

Reputation: 2131

Parallelize data processing

I have a large matrix data that I want to "organize" in a certain way. The matrix has 5 columns and about 2 million rows. The first 4 columns are characteristics of each observation (these are integers) and the last column is the outcome variable I'm interested in (this contains real numbers). I want to organize this matrix in an Array of Arrays. Since data is very large, I'm trying to parallelize this operation:

addprocs(3)

@everywhere data = readcsv("datalocation", Int)

@everywhere const Z = 65
@everywhere const z = 16
@everywhere const Y = 16
@everywhere const y = 10

@everywhere const arr = Array{Vector}(Z-z+1,Y-y+1,Z-z+1,Y-y+1)

@parallel (vcat) for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
    arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = data[(data[:,1].==a1) & (data[:,2].==e1) & (data[:,3].==a2) & (data[:,4].==e2), end]
end

However I get an error when I try to run the for loop:

Error: syntax: invalid assignment location

After the loop is finished, I would like to have arr available to all processors. What am I doing wrong?

EDIT: The input matrix data looks like this (rows in no particular order):

16   10   16   10   100
16   10   16   11   200
20   12   21   13   500
16   10   16   10   300
20   12   21   13   500

Notice that some rows can be repeated, and some others will have the same "key" but a different fifth column.

The output I want looks like this (notice how I'm using the dimensions of arr as "keys" for a "dictionary":

arr[16-z+1, 10-y+1, 16-z+1, 10-y+1] = [100, 300]
arr[16-z+1, 10-y+1, 16-z+1, 11-y+1] = [200]
arr[20-z+1, 12-y+1, 21-z+1, 13-y+1] = [500, 500]

That is, the element of arr at index (16-z+1, 10-y+1, 16-z+1, 10-y+1) is the vector [100, 300]. I don't care about the ordering of the rows or the ordering of the last column of vectors.

Upvotes: 2

Views: 116

Answers (2)

Michael Ohlrogge
Michael Ohlrogge

Reputation: 10980

Does this work for you? I tried to simulate your data by repeating the snippet that you gave of it 1000 times. It's not as elegant as I would have wanted and in particular, I couldn't quite get the remotecall_fetch() working like I wanted (even when wrapping it with @async) so I had to split the calling and the fetching into two steps. Let me know though how this seems.

addprocs(n)

@everywhere begin
    if myid() != 1
        multiplier = 10^3;
        Data = readdlm("/path/to/Input.txt")
        global data = kron(Data,ones(multiplier));
        println(size(data))
    end
end

@everywhere begin
    function Select_Data(a1, e1, a2, e2, data=data)
        return data[(data[:,1].==a1) & (data[:,2].==e1) & (data[:,3].==a2) & (data[:,4].==e2), end]
    end
end

n_workers = nworkers()
function next_pid(pid, n_workers)
    if pid <= n_workers
        return pid + 1
    else
        return 2
    end
end

const arr = Array{Any}(Z-z+1,Y-y+1,Z-z+1,Y-y+1);
println("Beginning Processing Work")
@sync begin
    pid = 2
    for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
        pid = next_pid(pid, n_workers)
        arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = remotecall(pid, Select_Data, a1, e1, a2, e2)
    end
end
println("Retrieving Completed Jobs")
@sync begin
    pid = 2
    for a1 in z:Z, e1 in y:Y, a2 in z:Z, e2 in y:Y
        arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1] = fetch(arr[a1-z+1,e1-y+1,a2-z+1,e2-y+1])
    end
end

Upvotes: 1

Michael Ohlrogge
Michael Ohlrogge

Reputation: 10980

Note: I initially misinterpreted your question. I had thought that you were trying to split the data amongst your workers, but I now see that isn't quite what you were after. I wrote up some simplified examples of ways that can be accomplished. I'll leave them up as a response in case anyone in the future finds them useful.

Get started:

writedlm("path/to/data.csv", rand(100,10), ',')
addprocs(4)

Option 1:

function sendto(p::Int; args...)
    for (nm, val) in args
        @spawnat(p, eval(Main, Expr(:(=), nm, val)))
    end
end

Data = readcsv("/path/to/data.csv")

for (idx, pid) in enumerate(workers())
    Start = (idx-1)*25 + 1
    End = Start + 24
    sendto(pid, Data = Data[Start:End,])
end

Option 2:

@everywhere begin
    if myid() != 1
        Start = (myid()-2)*25 + 1
        End = Start + 24
        println(Start)
        println(End)
        Data = readcsv("path/to/data.csv")[Start:End,:]
    end
end

# verify everything looks right for what got sent
@everywhere if myid()!= 1 println(typeof(Data)) end
@everywhere if myid()!= 1 println(size(Data)) end

Option 3:

for (idx, pid) in enumerate(workers())
    Start = (idx-1)*25 + 1
    End = Start + 24
    sendto(pid, Start = Start, End = End)
end

@everywhere if myid()!= 1 Data = readcsv("path/to/data.csv")[Start:End,:] end

Upvotes: 1

Related Questions