Gianmario Spacagna
Gianmario Spacagna

Reputation: 1300

Java Hadoop - Reducer receives different values for the same key multiple times when using Combiner class

I wrote a Map Reduce job in Hadoop Java API (1.0.3). The job consists on summing all the values of a particular field (X) in my data and create a weighted distribution of other fields.

INPUT:

1 field1_1 field2_1 field3_1 ... fieldX_1
2 field1_2 field2_2 field3_2 ... fieldX_2
3 field1_3 field2_3 field3_3 ... fieldX_3

Since one pair is emitted for any line in my data and that I need to use a single reducer in order to sum up all the values, I thought to set the same Reduce class as a Combiner.

Total X summing:

MAP OUTPUT:

X fieldX_1
X fieldX_2 
X fieldX_3
X ... 

REDUCE OUTPUT:

X fieldX_1+fieldX_2+fieldX_3+...

The weird thing that happens is that the combiner/reducer receives the same key multiple times:

X [fieldX_1 fieldX_1 fieldX_1 ... fieldX_1]
X [fieldX_2 fieldX_2 fieldX_2 ...]
X [fieldX_3 fieldX_3 fieldX_3 ...]
X ...

I am sure about this because I am logging in the stderr all the that are passed to each invocation of the reduce method for dubugging.

I want to add a more concrete example:

Data:

1 field1_1 field2_1 field3_1 ... 10
2 field1_2 field2_2 field3_2 ... 20
3 field1_3 field2_3 field3_3 ... 30
4 field1_1 field2_1 field3_1 ... 10
5 field1_2 field2_2 field3_2 ... 40
6 field1_3 field2_3 field3_3 ... 20
...

Map output:

X 10
X 20
X 30
X 10
X 40
X 20 

Reduce input (with combiner):

X [10 10 10 10]
X [20 20 20]
X [30 30 30 30 30 30 30]
X [40 40]

Reduce output (with combiner):

X 40
X 60
X 210
X 80

X is a constant label (field name). To note that the reducer is called with the same key X and collection of identical values of X, e.g. [10 10 10 ...] or [30 30 30...]. Each sum will be output separately. I mean the algorithm works fine but at this stage needs an additional reduce step to sum up the duplicates.

Real log example:

Nov 06, 2013 8:50:12 AM MYCLASS logInputError
WARNING: REDUCE-INPUT: X,[10.0]
Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
WARNING: REDUCE-OUTPUT: X,10.0
Nov 06, 2013 8:50:12 AM MYCLASS logInputError
WARNING: REDUCE-INPUT: X,[25.865, 25.865]
Nov 06, 2013 8:50:12 AM MYCLAS logOutputError
WARNING: REDUCE-OUTPUT: X,51.73
Nov 06, 2013 8:50:12 AM MYCLASS logInputError
WARNING: REDUCE-INPUT: X,[1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4, 1449271.4]
Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
WARNING: REDUCE-OUTPUT: X,2.0289798E7
Nov 06, 2013 8:50:12 AM MYCLASS logInputError
WARNING: REDUCE-INPUT: X,[514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53, 514994.53]
Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
WARNING: REDUCE-OUTPUT: X,6694929.0
Nov 06, 2013 8:50:12 AM MYCLASS logInputError
WARNING: REDUCE-INPUT: X,[1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5, 1438665.5]
Nov 06, 2013 8:50:12 AM MYCLASS logOutputError
WARNING: REDUCE-OUTPUT: X,1.8702654E7

If I remove the Combiner everything works fine. I understand that the Combiner may be invoked 0, 1 or multiple times, but what about the Reducer? It should be invoked exactly once, isn't it?

But even more weird is that I am repeating a similar procedure for the fields distribution, and this happens only for X summing problem...

Weighted Field Distribution

MAP OUTPUT (example field1):

field1_1 X_1
field1_2 X_2
field1_3 X_3
... 

REDUCE OUTPUT:

field1(class1) fieldX(class1)+fieldX(class1)+fieldX(class1)+...
field1(class2) fieldX(class2)+fieldX(class2)+fieldX(class2)+...
field1(class3) fieldX(class3)+fieldX(class3)+fieldX(class3)+...
...

Essentially, for each value of field1 I am summing up all the associated values of fieldX and repeating the same procedure for several fields (field1, field2, field3...).

For those emitted pairs, the reducer receives the single key (field1(class1)) and the array of values ([fieldX(class1)...]) as should normally behave.

Conclusion

One consideration is that for the X summing problem the single key (X) maps a number of values equals to the size of the data (number of lines). Whilst, for the field weighted distribution the values are spread among the several class labels that field contains.

Is it a bug in my code or there is some procedural detail of Hadoop that I am not considering?

Standing to the M/R paradigm a Reducer class should receive all the values of a particular key at once, not divided in more partitions.

Hope to receive good feedback.

Upvotes: 1

Views: 2126

Answers (1)

harpun
harpun

Reputation: 4110

The weird thing that happens is that the combiner receives the same key multiple times

This is possible as the combiner may be called multiple times by the MapReduce framework, see JobConf#setCombinerClass():

The framework may invoke the combiner 0, 1, or multiple times, in both the mapper and reducer tasks. In general, the combiner is called as the sort/merge result is written to disk. The combiner must:

  • be side-effect free
  • have the same input and output key types and the same input and output value types

Upvotes: 1

Related Questions