user2336525
user2336525

Reputation: 55

How to implement the MapReduce example in Erlang efficiently?

I am trying to compare the performance of concurrent programming languages, such as Haskell, Go and Erlang. The following Go code calculates the sum of squares, ( repeat calculate the sum of squares for R times):

1^2+2^2+3^2....1024^2

package main

import "fmt"

func mapper(in chan int, out chan int) {
    for v := range in {out <- v*v}
}

func reducer(in1, in2 chan int, out chan int) {
    for i1 := range in1 {i2 := <- in2; out <- i1 + i2}
}

func main() {
    const N = 1024  // calculate sum of squares up to N; N must be power of 2
    const R = 10  // number of repetitions to fill the "pipe"

    var r [N*2]chan int
    for i := range r {r[i] = make(chan int)}
    var m [N]chan int
    for i := range m {m[i] = make(chan int)}

    for i := 0; i < N; i++ {go mapper(m[i], r[i + N])}
    for i := 1; i < N; i++ {go reducer(r[i * 2], r[i *2 + 1], r[i])}

    go func () {
        for j := 0; j < R; j++ {
            for i := 0; i < N; i++ {m[i] <- i + 1} 
        }
    } ()

    for j := 0; j < R; j++ {
        <- r[1]
    } 
}

The following code is the MapReduce solution in Erlang. I am a newbie to Erlang. I would like to compare performance among Go, Haskell and Erlang. My question is how to optimize this Erlang code. I compile this code by using erlc -W mr.erl and run the code by using erl -noshell -s mr start -s init stop -extra 1024 1024. Are there any special compile and execution options available for optimizations? I really appreciate any help you can provide.

-module(mr).
-export([start/0, create/2, doreduce/2, domap/1, repeat/3]).

start()->
    [Num_arg|Repeat] = init:get_plain_arguments(),
    N = list_to_integer(Num_arg),
    [R_arg|_] = Repeat,
    R = list_to_integer(R_arg),
    create(R, N).

create(R, Num) when is_integer(Num), Num > 0 ->
    Reducers = [spawn(?MODULE, doreduce, [Index, self()]) || Index <- lists:seq(1, 2*Num - 1)],
    Mappers = [spawn(?MODULE, domap, [In]) || In <- lists:seq(1, Num)],
    reducer_connect(Num-1, Reducers, self()),
    mapper_connect(Num, Num, Reducers, Mappers),
    repeat(R, Num, Mappers).

repeat(0, Num, Mappers)->
    send_message(Num, Mappers),
    receive
        {result, V}->
            %io:format("Repeat: ~p ~p ~n", [0, V])
            true
    end;
repeat(R, Num, Mappers)->
    send_message(Num, Mappers),
    receive
        {result, V}->
            %io:format("Got: ~p ~p ~n", [R, V])
            true
    end,
    repeat(R-1, Num, Mappers).
send_message(1, Mappers)->
    D = lists:nth (1, Mappers),
    D ! {mapper, 1};
send_message(Num, Mappers)->
    D = lists:nth (Num, Mappers),
    D ! {mapper, Num}, 
    send_message(Num-1, Mappers).

reducer_connect(1, RList, Root)->
    Parent = lists:nth(1, RList),
    Child1 = lists:nth(2, RList),
    Child2 = lists:nth(3, RList),
    Child1 ! {connect, Parent},
    Child2 ! {connect, Parent},
    Parent !{connect, Root};      
reducer_connect(Index, RList, Root)->
    Parent = lists:nth(Index, RList),
    Child1 = lists:nth(Index*2, RList),
    Child2 = lists:nth(Index*2+1, RList),
    Child1 ! {connect, Parent},
    Child2 ! {connect, Parent},
    reducer_connect(Index-1, RList, Root).

mapper_connect(1, Num, RList, MList)->
    R = lists:nth(Num, RList),
    M = lists:nth(1, MList),
    M ! {connect, R};
mapper_connect(Index, Num, RList, MList) when is_integer(Index), Index > 0 ->
    R = lists:nth(Num + (Index-1), RList),
    M = lists:nth(Index, MList),
    M ! {connect, R},
    mapper_connect(Index-1, Num, RList, MList).  

doreduce(Index, CurId)->
    receive
        {connect, Parent}->
            doreduce(Index, Parent, 0, 0, CurId)
    end.
doreduce(Index, To, Val1, Val2, Root)->
    receive
        {map, Val} ->
            if Index rem 2 == 0 ->    
                To ! {reduce1, Val},
                doreduce(Index, To, 0, 0, Root);
            true->
                To ! {reduce2, Val},
                doreduce(Index, To, 0, 0, Root)
            end;
        {reduce1, V1} when Val2 > 0, Val1 == 0 ->
            if Index == 1 ->% root node
                Root !{result, Val2 + V1},
                doreduce(Index, To, 0, 0, Root);
            Index rem 2 == 0 ->    
                To ! {reduce1, V1+Val2},
                doreduce(Index, To, 0, 0, Root);
            true->
                To ! {reduce2, V1+Val2},
                doreduce(Index, To, 0, 0, Root)
            end;
        {reduce2, V2} when Val1 > 0, Val2 == 0 ->
            if Index == 1 ->% root node
                Root !{result, Val1 + V2},
                doreduce(Index, To, 0, 0, Root);
            Index rem 2 == 0 ->
                To ! {reduce1, V2+Val1},
                doreduce(Index, To, 0, 0, Root);
            true->
                To ! {reduce2, V2+Val1},
                doreduce(Index, To, 0, 0, Root)
            end;
        {reduce1, V1} when Val1 == 0, Val2 == 0 ->
            doreduce(Index, To, V1, 0, Root);
        {reduce2, V2} when Val1 == 0, Val2 == 0 ->
            doreduce(Index, To, 0, V2, Root);
        true->
            true
    end.

domap(Index)->
    receive
       {connect, ReduceId}->
            domap(Index, ReduceId)
    end.

domap(Index, To)->
    receive
        {mapper, V}->
            To !{map, V*V},
            domap(Index, To);
        true->
            true
    end.

Upvotes: 0

Views: 819

Answers (1)

Hynek -Pichi- Vychodil
Hynek -Pichi- Vychodil

Reputation: 26121

Despite it is not a good task for Erlang at all, there is a quite simple solution:

-module(mr).

-export([start/1, start/2]).

start([R, N]) ->
    Result = start(list_to_integer(R), list_to_integer(N)),
    io:format("~B x ~B~n", [length(Result), hd(Result)]).

start(R, N) ->
    Self = self(),
    Reducer = start(Self, R, 1, N),
    [ receive {Reducer, Result} -> Result end || _ <- lists:seq(1, R) ].

start(Parent, R, N, N) ->
    spawn_link(fun() -> mapper(Parent, R, N) end);
start(Parent, R, From, To) ->
    spawn_link(fun() -> reducer(Parent, R, From, To) end).

mapper(Parent, R, N) ->
    [ Parent ! {self(), N*N}  || _ <- lists:seq(1, R) ].

reducer(Parent, R, From, To) ->
    Self = self(),
    Middle = ( From + To ) div 2,
    A = start(Self, R, From, Middle),
    B = start(Self, R, Middle + 1, To),
    [ Parent ! {Self, receive {A, X} -> receive {B, Y} -> X+Y end end}
      || _ <- lists:seq(1, R) ].

You can run it using

$ erlc -W mr.erl
$ time erl -noshell -run mr start 1024 1024 -s init stop
1024 x 358438400

real    0m2.162s
user    0m4.177s
sys     0m0.151s

But most of the time is VM start and gracefull stop overhead

$ time erl -noshell -run mr start 1024 1024 -s erlang halt
1024 x 358438400

real    0m1.172s
user    0m4.110s
sys     0m0.150s

$ erl
1> timer:tc(fun() -> mr:start(1024,1024) end).
{978453,
 [358438400,358438400,358438400,358438400,358438400,
  358438400,358438400,358438400,358438400,358438400,358438400,
  358438400,358438400,358438400,358438400,358438400,358438400,
  358438400,358438400,358438400,358438400,358438400,358438400,
  358438400,358438400,358438400,358438400|...]}

Keep in mind it is more like an elegant solution than an efficient one. An efficient solution should balance reduction tree branching with communication overhead.

Upvotes: 3

Related Questions