João Melo
João Melo

Reputation: 508

What determines the number of reducers and how to avoid bottlenecks regarding reducers?

Suppose I have a big tsv file with this kind of information:

2012-09-22 00:00:01.0   249342258346881024  47268866    0   0   0   bo
2012-09-22 00:00:02.0   249342260934746115  1344951     0   0   4   ot
2012-09-22 00:00:02.0   249342261098336257  346095334   1   0   0   ot
2012-09-22 00:05:02.0   249342261500977152  254785340   0   1   0   ot

I want to implement a MapReduce job that enumerates time intervals of five minutes and filter some information of the tsv inputs. The output file would look like this:

0 47268866  bo
0 134495    ot
0 346095334 ot
1 254785340 ot

The key is the number of the interval, e.g., 0 is the reference of the interval between 2012-09-22 00:00:00.0 to 2012-09-22 00:04:59.

I don't know if this problem doesn't fit on MapReduce approach or if I'm not thinking it right. In the map function, I'm just passing the timestamp as key and the filtered information as value. In the reduce function, I count the intervals by using global variables and produce the output mentioned.

i. Does the framework determine the number of reducers in some automatically way or it is user defined? With one reducer, I think that there is no problem on my approach, but I'm wondering if one reduce can become a bottleneck when dealing with really large files, can it?

ii. How can I solve this problem with multiple reducers?

Any suggestions would be really appreciated! Thanks in advance!

EDIT:

The first question is answered by @Olaf, but the second still gives me some doubts regarding parallelism. The map output of my map function is currently this (I'm just passing the timestamp with minute precision):

2012-09-22 00:00   47268866    bo
2012-09-22 00:00   344951      ot
2012-09-22 00:00   346095334   ot
2012-09-22 00:05   254785340   ot

So in the reduce function I receive inputs that the key represents the minute when the information was collected and the values the information itself and I want to enumerate five minutes intervals beginning with 0. I'm currently using a global variable to store the beginning of the interval and when a key extrapolate it I'm incrementing the interval counter (That is also a global variable).

Here is the code:

private long stepRange = TimeUnit.MINUTES.toMillis(5);
private long stepInitialMillis = 0;
private int stepCounter = 0;

@Override
public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

    long millis = Long.valueOf(key.toString());
    if (stepInitialMillis == 0) {
        stepInitialMillis = millis;
    } else {
        if (millis - stepInitialMillis > stepRange) {
            stepCounter = stepCounter + 1;
            stepInitialMillis = millis;
        }
    }
    for (Text value : values) {
        context.write(new Text(String.valueOf(stepCounter)),
                new Text(key.toString() + "\t" + value));
    }
}

So, with multiple reducers, I will have my reduce function running on two or more nodes, in two or more JVMs and I will lose the control given by the global variables and I'm not thinking of a workaround for my case.

Upvotes: 1

Views: 1755

Answers (2)

Eric Wadsworth
Eric Wadsworth

Reputation: 11

It looks like you're wanting to aggregate some data by five-minute blocks. Map-reduce with Hadoop works great for this sort of thing! There should be no reason to use any "global variables". Here is how I would set it up:

The mapper reads one line of the TSV. It grabs the timestamp, and computes which five-minute bucket it belongs in. Make that into a string, and emit it as the key, something like "20120922:0000", "20120922:0005", "20120922:0010", etc. As for the value that is emitted along with that key, just keep it simple to start with, and send on the whole tab-delimited line as another Text object.

Now that the mapper has determined how the data needs to be organized, it's the reducer's job to do the aggregation. Each reducer will get a key (one of the five-minute buckers), along with the list of all the lines that fit into that bucket. It can iterate over that list, and extract whatever it wants from it, writing output to the context as needed.

As for mappers, just let hadoop figure that part out. Set the number of reducers to how many nodes you have in the cluster, as a starting point. Should run just fine.

Hope this helps.

Upvotes: 1

Olaf
Olaf

Reputation: 6289

The number of reducers depends on the configuration of the cluster, although you can limit the number of reducers used by your MapReduce job.

A single reducer would indeed become a bottleneck in your MapReduce job if you are dealing with any significant amount of data.

Hadoop MapReduce engine gurantees that all values associated with the same key are sent to the same reducer, so your approach should work with multile reducers. See Yahoo! tutorial for details: http://developer.yahoo.com/hadoop/tutorial/module4.html#listreducing

EDIT: To guarantee that all values for the same time interval go to the same reducer, you would have to use some unique identifier of the time interval as the key. You would have to do it in the mapper. I'm reading your question again and, unless you want to somehow aggregate the data between the records corresponding to the same time interval, you don't need any reducer at all.

EDIT: As @SeanOwen pointed, the number of reducers depends on the configuration of the cluster. Usually it is configured between 0.95 and 1.75 times the number of maximum tasks per node times the number of data nodes. If the mapred.reduce.tasks value is not set in the cluster configuration, the default number of reducers is 1.

Upvotes: 2

Related Questions