Reputation: 9571
I have a data set that has approximately 1 billion data points. There are about 46 million unique data points I want to extract from this.
I want to use Hadoop to extract the unique values, but keep getting "Out of Memory" and Java heap size errors on Hadoop - at the same time, I am able to run this fairly easily on a single box using a Python Set (hashtable, if you will.)
I am using a fairly simple algorithm to extract these unique values: I am parsing the 1 billion lines in my map and outputting lines that look like this:
UniqValueCount:I a
UniqValueCount:I a
UniqValueCount:I b
UniqValueCount:I c
UniqValueCount:I c
UniqValueCount:I d
and then running the "aggregate" reducer to get the results, which should look like this for the above data set:
I 4
This works well for a small set of values, but when I run this for the 1 billion data points (which have 46 million keys, as I mentioned) the job fails.
I'm running this on Amazon's Elastic Map Reduce, and even if I use six m2.4xlarge nodes (their maximum memory nodes at 68.4 GB each) the job fails with the "out of memory" errors.
But I am able to extract the unique values using a Python code with a Set data structure (hash table) on one single m1.large (a much smaller box with 8 GB memory). I am confused that the Hadoop job fails since 46 million uniques should not take up that much memory.
What could be going wrong? Am I using the UniqValueCount wrong?
Upvotes: 1
Views: 1776
Reputation: 41428
You're probably getting the memory error in the shuffle, remember that Hadoop sorts the keys before starting the reducers. Sort itself is not necessary for most apps, but Hadoop uses this as a way to aggregate all value belonging to a key.
For your example, your mappers will end up writing a lot of times the same values, while you only care about how many uniques you have for a given key. Here is what you're doing right now:
Mapper output:
I -> a
I -> a
I -> a
I -> a
I -> b
I -> a
I -> b
Reducer input:
I -> [a, a, a, a, b, a, b]
Reducer output:
I -> 2
But you really don't need to write 5*a or 2*b in this case, 1 time would be enough since you only care about uniques. So instead of counting the uniques in the reducer, you could directly reduce a lot of overhead by making sure you only send each value once:
Mapper output:
I -> a
I -> b
Reducer input:
I -> [a, b]
Reducer output:
I -> 2
This would effectively reduce the network bandwidth, and the shuffle will be much simpler since there will be less keys to sort.
You could do this 2 ways:
Upvotes: 2