Reputation: 15942
I wrote a MapReduce job that took ngram counts on a dataset. The results are in one hundred 300MB files in format, <ngram>\t<count>
. I want to combine these into one result, but my few attempts at combining have crashed ("task tracker has gone away"). I had my timeout at 8 hours, and this crash occurred around 8.5 hours, so might be related. I had # reducers=5 (same as # of nodes). Maybe I just need to leave more time, although the error doesn't seem to indicate that. I suspect my nodes are getting overloaded, and becoming unresponsive. My theory is that my reducer could use some optimization.
I'm using cat
for my mapper, and the following python script for my reducer:
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] += count
for key in sorted(counts.keys()):
print '%s\t%s'% (key, counts[key])
Update:
As I hinted at in one of my comments, I am confused on what sorting takes place by Hadoop automatically. In the web UI, the reducer status shows a few different phases which includes "sort" and "reduce". From this I assume that Hadoop sorts the mapper output before sending it to reduce, but what isn't clear is if the sorting is on all data sent to the reducer, or on each file before it is reduced. In other words, my mapper takes the 100 fiels, splits this into 400 outputs, each simply cat
-ing them to the reducer, then the reducers (5 total) each receive these 80 streams. Does sort combine all 80, or does it sort 1, reduce it; etc? Based on the graphs, which could clearly be non-indicative of the actual behavior, the sort process takes place before any reducing. If the sorting does sort all the input files, then I can simplify my reducer to not store a dictionary of all counts, and just print out the key-totalCount pair once the key changes.
Regarding the use of a combiner, I don't think this would be beneficial in my case since the data I'm reducing has already been reduced in the 100 files i'm trying to combine. Since my # nodes = # reducers (5 & 5), there is nothing to combine that the reducer isn't already doing.
Upvotes: 2
Views: 1432
Reputation: 15942
The problem was my misunderstanding of how MapReduce works. All data going into Reducer is sorted. My code above was completely unoptimized. Instead, I simply keep track of current key, then print out the previous current when a new key shows up.
#!/usr/bin/env python
import sys
cur_key = None
cur_key_count = 0
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
# if new key, reset count, note current key, and output lastk key's result
if key != cur_key:
if cur_key is not None:
print '%s\t%s'% (cur_key, cur_key_count)
cur_key = key
cur_key_count = 0
cur_key_count += count
# printing out final key if set
if cur_key:
print '%s\t%s'% (cur_key, cur_key_count)
Upvotes: 2
Reputation: 170
Use top
to check your reducer is CPU bound and not IO bound (possibly causing swapping) as it runs.
8 hours/20 jobs per host is 24 mins per 300Mb job
You could possibly use a heapq
such that the data structure built in memory is kept sorted:
See section 8.4.1 of:
http://docs.python.org/library/heapq.html
Upvotes: 1