stholy
stholy

Reputation: 322

Hadoop : Number of input records for reducer

Is there anyway by which each reducer process could determine the number of elements or records it has to process ?

Upvotes: 0

Views: 6547

Answers (2)

ryanbwork
ryanbwork

Reputation: 2153

Your reducer class must extend the MapReducer Reduce class:

Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

and then must implement the reduce method using the KEYIN/VALUEIN arguments specified in the extended Reduce class

reduce(KEYIN key, Iterable<VALUEIN> values, org.apache.hadoop.mapreduce.Reducer.Context context)

The values associated with a given key can be counted via

int count = 0;
Iterator<VALUEIN> it = values.iterator();
while(it.hasNext()){
  it.Next();
  count++;
}

Though I'd propose doing this counting along side your other processing as to not make two passes through your value set.

EDIT

Here's an example vector of vectors that will dynamically grow as you add to it (so you won't have to statically declare your arrays, and hence don't need the size of the values set). This will work best for non-regular data (IE the number of columns is not the same for every row in your input csv file), but will have the most overhead.

Vector table = new Vector();

Iterator<Text> it = values.iterator();
while(it.hasNext()){

  Text t = it.Next();
  String[] cols = t.toString().split(",");   

  int i = 0;
  Vector row = new Vector(); //new vector will be our row
  while(StringUtils.isNotEmpty(cols[i])){
    row.addElement(cols[i++]); //here were adding a new column for every value in the csv row
  }

  table.addElement(row);
}

Then you can access the Mth column of the Nth row via

table.get(N).get(M);

Now, if you knew the # of columns would be set, you could modify this to use a Vector of arrays which would probably be a little faster/more space efficient.

Upvotes: 1

Chris White
Chris White

Reputation: 30089

Short answer - ahead of time no, the reducer has no knowledge of how many values are backed by the iterable. The only way you can do this is to count as you iterate, but you can't then re-iterate over the iterable again.

Long answer - backing the iterable is actually a sorted byte array of the serialized key / value pairs. The reducer has two comparators - one to sort the key/value pairs in key order, then a second to determine the boundary between keys (known as the key grouper). Typically the key grouper is the same as the key ordering comparator.

When iterating over the values for a particular key, the underlying context examines the next key in the array, and compares to the previous key using the grouping comparator. If the comparator determines they are equal, then iteration continues. Otherwise iteration for this particular key ends. So you can see that you cannot ahead of time determine how may values you will be passed for any particular key.

You can actually see this in action if you create a composite key, say a Text/IntWritable pair. For the compareTo method sort by first the Text, then the IntWritable field. Next create a Comparator to be used as the group comparator, which only considers the Text part of the key. Now as you iterate over the values in the reducer, you should be able to observe IntWritable part of the key changing with each iteration.

Some code i've used before to demonstrates this scenario can be found on this pastebin

Upvotes: 3

Related Questions