Reputation: 931
I have a large job running in AWS's elastic map reduce cluster. By large, I mean over 800,000 files I'm processing with 25,000+ records per file. In my test runs I have been using 100 m1.medium spot instances for processing.
The job seemed to be running correctly, however I noticed that the outputs (part-00000, part-00001, etc.) have records with the same key listed in multiple outputs. Aren't these supposed to be reduced down in EMR?
Any insight would be appreciated.
Upvotes: 2
Views: 931
Reputation: 1188
I am running into the same issue - I am using EMR to create an "inverted index" using the streaming API:
-input s3n://mybucket/html2 -output s3n://mybucket/results -mapper s3n://mybucket/mapper.py -reducer s3n://mybucket/reduce.py
Where //mybucket/html2 has a few html files and
mapper.py:
def main(args):
for line in sys.stdin:
line = line.strip()
words = line.split()
for word in words:
#do some preprocessing
if word.startswith("http://"):
#output the URL with a count of 1
print "%s,%s" % (word, 1)
else:
#cleanup HTML tags
url = get_url() #irrelevant
print "%s,%s" % (word, url)
if __name__ == "__main__":
main(sys.argv)
and reduce.py is:
def main(args):
current_word = None
current_count = 0
current_url_list = []
key = None
for line in sys.stdin:
line = line.strip()
(key, val) = line.split(',', 1)
# If key is a URL - act as word count reducer
if key.startswith("http:"):
# convert count (currently a string) to int
try:
count = int(val)
except:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == key:
current_count += count
else:
if current_word:
#Check if previous word was a regular word
if current_word.startswith('http:'):
print '%s,%s' % (current_word, current_count)
else:
# previous word was a regular word
print '%s,%s' % (current_word, ','.join(current_url_list))
current_count = count
current_word = key
else:
#If key is a word - as act a URL-list-appending reducer
if current_word == key:
if val not in current_url_list:
current_url_list.append(val)
else: #Got to a new key
if current_word:
#Check if previous word was a URL
if(current_word.startswith("http:")):
print '%s,%s' % (current_word, current_count)
else:
# previous word was a regular word
print '%s,%s' % (current_word, ','.join(current_url_list))
current_url_list = []
current_url_list.append(val)
current_word = key
I am starting this flow using the AWS console wizard ("Create new job flow") and except for setting the input, output, map and reduce scripts I am leaving everything as default (except for log path).
In the output I am getting few files and in them I see the same key (each time with different values).
Maybe this can help to shed more light on the issue and help resolve it
Upvotes: 1