Reputation: 966
I have a cluster of 4 riak nodes loaded with data. I am trying to perform a simple MapReduce job which just aggregates, but I am trying to do it by providing my own javascript functions (in order to move then to more involved MapReduce jobs).
My relevant Java snippet is:
IndexQuery iq = new IntRangeQuery(IntIndex.named(indexId), bucketId, 11, 40);
Function mapfunc = new JSSourceFunction(
streamToString(MapReduceDriver.class.getResourceAsStream("/map_1.js")));
Function redfunc = new JSSourceFunction(
streamToString(MapReduceDriver.class.getResourceAsStream("/reduce_1.js")));
PBMapReduceResult result = (PBMapReduceResult) riakClient.mapReduce(iq)
.addMapPhase(mapfunc)
.addReducePhase(redfunc)
.execute();
Where the two javascript functions are:
function map_keepAttr(value, keyData, arg) {
var data = Riak.mapValuesJson(value)[0];
return [ data.Attribute_17 ];
}
function reduce_aggregate(values, arg) {
return [values.length];
}
The problem I am seeing is the following: there are exactly 30 values that are produced by my query and the map phase. But the reduce phase reports 3 instead of 30 (thus not counting correctly). What's even more strange is that when I use the following reduce function:
function reduce_aggregate(values, arg) {
return values.length;
}
I am getting the expected result i.e. a json array containing exactly 30 entries.
Any help would save me because I don't seem to get how the MapReduce in Riak works.
Thanks!
Upvotes: 0
Views: 316
Reputation: 1665
I suspect the issue you are seeing might be caused by not accounting for re-reduce in your reduce phase function.
While map phase functions are executed once per record, reduce phase functions are not necessarily executed once with the full data set as input, but instead recursively over portions of the map phase output until all records have been processed. The result created by the first run of the reduce function will be included in the array sent to the next call.
In order to count the number of items using a reduce function, you will need to be able to either distinguish results from previous reduce functions from map phase input or ensure that they have the same format and can be aggregated correctly regardless of where the data comes from.
Upvotes: 1