Reputation: 3254
I've been learning Riak and ran into an issue with mapReduce. My mapReduce functions work fine when there's 15 records, but after that, it throws a stack trace error. I'm new to Riak and Erlang, so I'm unsure whether it's my code or it's Riak. Any advice on how to debug this or what the problem is appreciated!
-module(identity_map).
-export([identity/3]).
identity(Values, _, _) ->
JsonData = riak_object:get_values(Values),
{struct, Objects} = mochijson2:decode(JsonData),
[Objects].
-module(stocks_summary).
-export([average_high/2]).
% Returns values from a map phase
average_high(Values, _) ->
Total = lists:foldl(
fun(Record, Accum) ->
High = proplists:get_value(<<"high_d">>, Record),
Accum + High
end,
0,
Values),
[Total / length(Values)].
curl -XPOST http://192.168.0.126:8098/mapred \
-H 'Content-Type: application/json' \
-d '{"inputs": ["stocks","goog"], "query": [{"map":{"language":"erlang","module":"identity_map","function":"identity"}}, {"reduce":{"language":"erlang","module":"stocks_summary","function":"average_high"}} ]}'
Date,Open,High,Low,Close,Volume,Adj Close
2010-05-05,500.98,515.72,500.47,509.76,4566900,509.76
2010-05-04,526.52,526.74,504.21,506.37,6076300,506.37
2010-05-03,526.50,532.92,525.08,530.60,1857800,530.60
2010-04-30,531.13,537.68,525.44,525.70,2435400,525.70
2010-04-29,533.37,536.50,526.67,532.00,3058900,532.00
2010-04-28,532.10,534.83,521.03,529.19,3406100,529.19
2010-04-27,528.95,538.33,527.23,529.06,3844700,529.06
2010-04-26,544.97,544.99,529.21,531.64,4368800,531.64
2010-04-23,547.25,549.32,542.27,544.99,2089400,544.99
2010-04-22,552.00,552.50,543.35,547.06,3280700,547.06
2010-04-21,556.46,560.25,552.16,554.30,2391500,554.30
2010-04-20,554.17,559.66,551.06,555.04,2977400,555.04
2010-04-19,548.75,553.99,545.00,550.10,3894000,550.10
2010-04-16,563.00,568.81,549.63,550.15,12235500,550.15
2010-04-15,592.17,597.84,588.29,595.30,6716700,595.30
2010-04-14,590.06,592.34,584.01,589.00,3402700,589.00
2010-04-13,572.53,588.88,571.13,586.77,3845200,586.77
2010-04-12,567.35,574.00,566.22,572.73,2352400,572.73
2010-04-09,567.49,568.77,564.00,566.22,2056600,566.22
2010-04-08,563.32,569.85,560.05,567.49,1947500,567.49
2010-04-07,567.30,568.75,561.86,563.54,2581000,563.54
{
"date_s": "2010-04-07",
"open_d": 567.3,
"high_d": 568.75,
"low_d": 561.86,
"close_d": 563.54,
"volume_i": 2581000,
"adjClose_d": 563.54
}
{
"phase": 1,
"error": "{function_clause,[{proplists,get_value,[<<\"high_d\">>,555.621,undefined],[{file,\"proplists.erl\"},{line,225}]},{stocks_summary,'-average_high/2-fun-0-',2,[{file,\"stocks_summary.erl\"},{line,9}]},{lists,foldl,3,[{file,\"lists.erl\"},{line,1248}]},{stocks_summary,average_high,2,[{file,\"stocks_summary.erl\"},{line,7}]},{riak_kv_w_reduce,reduce,3,[{file,\"src/riak_kv_w_reduce.erl\"},{line,207}]},{riak_kv_w_reduce,done,1,[{file,\"src/riak_kv_w_reduce.erl\"},{line,170}]},{riak_pipe_vnode_worker,wait_for_input,...},...]}",
"input": null,
"type": null,
"stack": null
}
From the stack trace, it looks like reduce is being applied to a value as opposed to a list of tuples, but what makes this problem strange is that when I only put 10-15 records into the bucket, it works fine.
Upvotes: 1
Views: 89
Reputation: 28316
The problem is with the reduce phase. The map phase is spread around the cluster and executed by many vnodes that forward the map phase result to the node that will be running the reduce. Since these are not expect to arrive simultaneously, the reduce phase function may be run multiple times, with its input on subsequent runs being the concatenation of the previous reduce result and the newly arrived map phase results.
This means that on the second run, your reduce function receives the previous avarage as a plain number as the first element in a list, with the rest of the list being the json objects/proplists that you expect.
To fix this, have your reduce function return a proplist containing the current average and number of values seen thus far. Below is one possibility, but this example would return your final result of the MapReduce as an object/proplist instead of a number.
average_high(Values, _) ->
{Count,Total} = lists:foldl(
fun(Record, {Cnt,Tot}) ->
case proplists:get_value(<<"average">>,Record,undefined) of
undefined ->
High = proplists:get_value(<<"high_d">>, Record),
{Cnt+1,Tot + High};
Ave ->
C = proplists:get_value(<<"count">>, Record, 1),
{Cnt + C, Tot + (Ave * C)}
end
end,
{0,0},
Values),
[[{<<"average">>,Total/Count},{<<"count">>,Count}]].
The phase function docs specify:
A reduce function should produce a list of values, but it must also be true that the function is commutative, associative, and idempotent. That is, if the input list [a,b,c,d] is valid for a given F, then all of the following must produce the same result:
F([a,b,c,d]) F([a,d] ++ F([c,b])) F([F([a]),F([c]),F([b]),F([d])])
Upvotes: 2