xave
xave

Reputation: 11

Parallel Computing SharedArray

I am trying to implement a simple parallel algorithm in Julia for a multiply-add operation on a matrix. This is more for understanding how to do parallel computations in Julia than for practicality.

Ultimately, this should implement C = C + A*B where A,B,and C are matrices. The code I am using is:

function MultiplyAdd!(C::SharedArray{Int64,2}, A::SharedArray{Int64,2}, B::SharedArray{Int64,2})
    assert(size(A)[2]==size(B)[1])

    const p = size(A)[1] #rows(A), rows(C)
    const m = size(A)[2] #cols(A), rows(B)
    const n = size(B)[2] #cols(C), cols(B) 

    # thresh is some threshold based on the number of operations required
    # to compute C before we should switch to parallel computation.
    const thresh = 190 
    M = 2*p*m*n 

    if ( M < thresh )
         C += A*B # base case for recursion
    elseif ( n >= max(p,m) )
    # Partition C and B (Vertical Split)
            if (iseven(p))
                #even partition of C and B
                #MultiplyAdd!( C0, A, B0 )
                #MultiplyAdd!( C1, A, B1 )
                a = @spawn MultiplyAdd!( C[:,1:convert(Int64,p/2)], A, B[:,1:convert(Int64,p/2)] ) 
                b = @spawn MultiplyAdd!( C[:,1:convert(Int64,p-(p/2))], A, B[:,1:convert(Int64,p-(p/2))]) 
                fetch(a), fetch(b)
            else
                #odd parition of C and B
                a = @spawn MultiplyAdd!( C[:,1:convert(Int64,p/2+0.5)], A, B[:,1:convert(Int64,p/2+0.5)] ) 
                b = @spawn MultiplyAdd!( C[:,1:convert(Int64,p-(p/2+0.5))], A, B[:,1:convert(Int64,p-(p/2+0.5))]) 
                fetch(a), fetch(b)
            end

    elseif ( p >= m )
    # Partition C and A (Horizontal Split)
            if (iseven(n))
                #even partition of C and A
                #MultiplyAdd!( C0, A0, B )
                #MultiplyAdd!( C1, A1, B )
                a = @spawn MultiplyAdd!(C[1:convert(Int64,n/2),:],A[1:convert(Int64,n/2),:],B)
                b = @spawn MultiplyAdd!(C1 = C[1:convert(Int64,n-(n/2)),:], A[1:convert(Int64,n-(n/2)),:], B) 
                fetch(a), fetch(b)
            else
                #odd parition of C and A
                # MultiplAdd!( C0, A0, B )
                # MultiplyAdd!( C1, A1, B )
                a = @spawn MultiplyAdd!( C[1:convert(Int64,n/2 + 0.5),:], A[1:convert(Int64,n/2),:], B ) 
                b = @spawn MultiplyAdd!( C[1:convert(Int64,n-(n/2 + 0.5)),:], A[1:convert(Int64,n-(n/2 + 0.5)),:], B ) 
                fetch(a), fetch(b)
            end   
    else
        #Begin Serial Recursion
        # A is Vertical Split, B is Horizontal Split
        #MultiplyAdd!( C,A0,B0 )
        #MultiplyAdd!( C,A1,B1 )
        if (iseven(m))
            MultiplyAdd!( C,A[:,1:convert(Int64,m/2)], B[1:convert(Int64,m/2),:] ) 
            MultiplyAdd!( C,A[:,1:convert(Int64,m-(m/2))], B[1:convert(Int64,m-(m/2) ),:] ) 
        else
            MultiplyAdd!( C,A[:,1:convert(Int64,m/2 + 0.5)], B[1:convert(Int64,m/2 + 0.5), :])
            MultiplyAdd!( C,A[:,1:convert(Int64,m-(m/2 + 0.5))], B[1:convert(Int64,m-(m/2 + 0.5)), :] ) 
        end     

    end
end

First, this doesn't work at all. I get the following error when I run it.

LoadError: On worker 5:
UndefVarError: #MultiplyAdd! not defined
 in deserialize_datatype at ./serialize.jl:823
 in handle_deserialize at ./serialize.jl:571
 in collect at ./array.jl:307
 in deserialize at ./serialize.jl:588
 in handle_deserialize at ./serialize.jl:581
 in deserialize at ./serialize.jl:541
 in deserialize_datatype at ./serialize.jl:829
 in handle_deserialize at ./serialize.jl:571
 in deserialize_msg at ./multi.jl:120
 in message_handler_loop at ./multi.jl:1317
 in process_tcp_streams at ./multi.jl:1276
 in #618 at ./event.jl:68
while loading In[32], in expression starting on line 73

 in #remotecall_fetch#606(::Array{Any,1}, ::Function, ::Function, ::Base.Worker, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1070
 in remotecall_fetch(::Function, ::Base.Worker, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1062
 in #remotecall_fetch#609(::Array{Any,1}, ::Function, ::Function, ::Int64, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1080
 in remotecall_fetch(::Function, ::Int64, ::Base.RRID, ::Vararg{Any,N}) at ./multi.jl:1080
 in call_on_owner(::Function, ::Future, ::Int64, ::Vararg{Int64,N}) at ./multi.jl:1130
 in fetch(::Future) at ./multi.jl:1156
 in MultiplyAdd!(::SharedArray{Int64,2}, ::SharedArray{Int64,2}, ::SharedArray{Int64,2}) at ./In[32]:24

Second, it would seem to me that I should not run two @spawn tasks. I should let the second one just be a MultiplyAdd!(C,A,B) call in each of these cases. In other words, just assign a and fetch(a).

Third, Julia passes arrays to functions by reference, so wouldn't all of the operations naturally operate on the same C, A, and B matrices? As it stands, taking a slice such as:

C0 = C[:, 1:p/2]
C1 = C[:, 1:p-p/2]

creates an entirely new object, which explains taking the slices inside of the function calls in the above code. In essence, I am avoiding assignment to try and always operate on the same object. There must be a better way to do that than what I have implemented. Ultimately, I want to operate on the same data in memory and just "move a magnifying glass over the array" to operate on subsets of it in parallel.

Upvotes: 1

Views: 415

Answers (1)

Kevin L. Keys
Kevin L. Keys

Reputation: 995

It is difficult to help you here because you have not really asked a question. At the risk of sounding condescending, I suggest that you peek at suggestions for asking a good SO question.

As for your code, there are several problems with your approach.

  1. As coded, MultiplyAdd! only parallelizes to a maximum of two workers.
  2. MultiplyAdd! performs several calls like A[:,1:n] which allocate new arrays.
  3. In that vein, calls like A[:,1:n] will make Array objects and not SharedArray objects, so recursive calls to MultiplyAdd! with arguments strictly typed to SharedArray{Int,2} will not work.
  4. Lastly, MultiplyAdd! does not respect the indexing scheme of SharedArray objects.

The last point is important. Asking worker 1 to access the parts of A and B assigned to worker 2 requires data transfer, which kills any performance gain from parallelizing your code. You can see this by running

for i in procs(A)
    @show i, localindexes(A)
end

on your SharedArray object A. Each worker i should ideally operate only on its own local indices, though it can be helpful to allow data sharing at local index boundaries to save yourself some bookkeeping headache.

If you insist on using SharedArrays for your prototype, then you still have options. The SharedArray documentation has some good suggestions. I have found that the construct

function f(g, S::SharedArray)
    @sync begin
        for p in procs(S)
            @async begin
                remotecall_fetch(g, p, S, p)
            end
        end
    end
    S
end

with some kernel function g (e.g. MultiplyAdd!) will typically parallelize operations nicely across all participating workers. Obviously you must decide on how to partition the execution; the advection_shared! example in the docs is a good guide.

You may also consider using Julia's native multithreading. That parallel framework is a little different than shared memory computing. However, it allows you to operate on Array objects directly with familiar iteration constructs.

Upvotes: 1

Related Questions