Reputation: 41
I am running a simple mapreduce job written in python and I noticed that when I test the script locally, i obtain a different out then when I run the job on hadoop. My input is something of a kind:
key1 val1
key1 val2
key1 val3
key1 val4
key2 val1
key2 val3
key2 val5
key3 val5
key4 val4
My mapper creates a dictionary of values with their corresponding list (string) of keys (e.g. val1 key1,key2 ; val2 key1 ; val3 key1,key2 ....). Then for each value in the dictionary I print all the possible key pairs. So the output of my mapper looks like:
key1_key2 1 # obtained from val1
key1_key2 1 # obtained from val3
key1_key4 1 # obtained from val4
key2_key3 1 # obtained from val5
The reducer counts the number of identical key pairs and prints the count. My mapper code is:
val_dic = dict()
def print_dic(dic):
for val, key_array in dic.iteritems():
key_pair= ""
i=0
j=1
for i in range(len(key_array)-1):
for j in range(i+1,len(key_array)):
key_pair = key_array[i]+"_"+key_array[j]
print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:
key, val = line.strip().split("\t")
if (not val in val_dic):
val_dic[val]=[]
val_dic[val].append(key)
print_dic(val_dic)
The reducer is counting all the identical values:
current_pair = None
current_count = 0
for line in sys.stdin:
key_pair, count = line.strip().split("\t")
count = int(count)
if current_pair == key_pair:
current_count += count
else:
print "{0}\t{1}".format(current_pair,str(current_count))
current_pair = key_pair
current_count = count
print "{0}\t{1}".format(current_pair,str(current_count))
However when I run it on hadoop on a larger dataset it seems that half the results are missing. When I test it on the local machine using cat input | mapper.py | sort |reducer.py > out-local If the input is reasonalbe small,it works fine, but on bigger data sets (e.g. 1M entries), the local output file has almost twice as many entries than the one obtained from running the mapreduce job on hadoop. Is there an error in the code? or am I missing something? Any help is highly appreciated.
Upvotes: 1
Views: 3047
Reputation: 50927
Your mapper generates all pairwise combinations of keys that it sees for a given value.
The model of map-reduce is that the mapper processes, in an embarrassingly parallel fashion, each record of the inputs, and emits key-value pairs. It maps records to key-value pairs. Indeed, a typical native (Java) mapper can only "see" one record at a time, so could never operate the way your streaming mapper does.
In the streaming api, you can sort of "cheat" a bit and process the entire input split at once - for the entire chunk of the file given to you, you can process all of the input records in that chunk, and so it's possible to do some other operations than just map individual key-value pairs. But in general you do not have access to the entire input; the input is broken up into splits, and a mapper gets each split. If one split contains the whole input, then you don't have any parallelism in the map phase, and there is no reason to use hadoop at all.
What is almost certainly happening here is that your input file gets broken up into two splits, and now your mapper no longer can find all key pairs corresponding to a given value, because you don't have all of the input records in your split. So for instance, consider breaking up the input file you've supplied into roughly two, one with all the "key1s", and one with the others. Running your map-reduce set locally on all of the input at once produces the output you'd expect:
$ cat input1 input2 | ./map.py | sort | ./reduce.py
None 0
key1_key2 2
key1_key4 1
key2_key3 1
But the hadoop workflow is that different mappers get each input, and they're only combined in the shuffle/reduce phase:
$ cat input1 | ./map.py > output1
$ cat input2 | ./map.py > output2
$ cat output1 output2 | sort | ./reduce.py
None 0
key2_key3 1
so now you're missing results. This is inevitable, because in any case that would make sense to use hadoop for, no individual mapper is going to see all of the data.
You'll need to refactor things, so that the map simply emits (value, key) pairs, and then the reducer gathers all of the keys together for a given value and then generates all key pairs with a count. Then another map-reduce step will have to do the count.
So you'd have a map1.py and a reduce1.py:
#!/usr/bin/env python
# map1.py
import sys
for line in sys.stdin:
line = line.strip()
key, val = line.strip().split("\t")
print val, "\t", key
#!/usr/bin/env python
# reduce1.py
import sys
def emit_keypairs(keylist):
for i in range(len(keylist)-1):
for j in range(i+1,len(keylist)):
key_pair = keylist[i]+"_"+keylist[j]
print "{0}\t{1}".format(key_pair,"1")
current_word = None
current_keylist = []
for line in sys.stdin:
line = line.strip()
word, key = line.split('\t', 1)
if current_word == word:
current_keylist.append(key)
else:
if current_word:
emit_keypairs(current_keylist)
current_word = word
current_keylist = [key]
# do not forget to output the last word if needed!
if current_word == word:
emit_keypairs(current_keylist)
Run those, and then basically just run a wordcount on the output. This will be robust to splitting the input file:
$ cat input1 | ./map1.py > map1
$ cat input2 | ./map1.py > map2
$ cat map1 map2 | sort | ./reduce1.py
key1_key2 1
key1_key2 1
key1_key4 1
key2_key3 1
and then another map-reduce phase with wordcount will produce the expected results.
Upvotes: 1