CSDS
CSDS

Reputation: 41

Hadoop MapReduce Streaming output different from the output of running MapReduce locally

I am running a simple mapreduce job written in python and I noticed that when I test the script locally, i obtain a different out then when I run the job on hadoop. My input is something of a kind:

key1        val1
key1        val2
key1        val3
key1        val4
key2        val1
key2        val3
key2        val5
key3        val5
key4        val4

My mapper creates a dictionary of values with their corresponding list (string) of keys (e.g. val1 key1,key2 ; val2 key1 ; val3 key1,key2 ....). Then for each value in the dictionary I print all the possible key pairs. So the output of my mapper looks like:

key1_key2   1   # obtained from val1
key1_key2   1   # obtained from val3
key1_key4   1   # obtained from val4
key2_key3   1   # obtained from val5

The reducer counts the number of identical key pairs and prints the count. My mapper code is:

val_dic = dict()
def print_dic(dic):
    for val, key_array in dic.iteritems():
        key_pair= ""
        i=0
        j=1
        for i in range(len(key_array)-1):
            for j in range(i+1,len(key_array)):
                key_pair = key_array[i]+"_"+key_array[j]
                print "{0}\t{1}".format(key_pair,"1")
for line in sys.stdin:  
    key, val = line.strip().split("\t")
    if (not val in val_dic):
        val_dic[val]=[]
    val_dic[val].append(key) 
print_dic(val_dic)

The reducer is counting all the identical values:

   current_pair = None
    current_count = 0 
    for line in sys.stdin:
    key_pair, count = line.strip().split("\t")
    count = int(count)
        if current_pair == key_pair:
            current_count += count
        else:
            print "{0}\t{1}".format(current_pair,str(current_count))
            current_pair = key_pair
            current_count = count
    print "{0}\t{1}".format(current_pair,str(current_count))

However when I run it on hadoop on a larger dataset it seems that half the results are missing. When I test it on the local machine using cat input | mapper.py | sort |reducer.py > out-local If the input is reasonalbe small,it works fine, but on bigger data sets (e.g. 1M entries), the local output file has almost twice as many entries than the one obtained from running the mapreduce job on hadoop. Is there an error in the code? or am I missing something? Any help is highly appreciated.

Upvotes: 1

Views: 3047

Answers (1)

Jonathan Dursi
Jonathan Dursi

Reputation: 50927

Your mapper generates all pairwise combinations of keys that it sees for a given value.

The model of map-reduce is that the mapper processes, in an embarrassingly parallel fashion, each record of the inputs, and emits key-value pairs. It maps records to key-value pairs. Indeed, a typical native (Java) mapper can only "see" one record at a time, so could never operate the way your streaming mapper does.

In the streaming api, you can sort of "cheat" a bit and process the entire input split at once - for the entire chunk of the file given to you, you can process all of the input records in that chunk, and so it's possible to do some other operations than just map individual key-value pairs. But in general you do not have access to the entire input; the input is broken up into splits, and a mapper gets each split. If one split contains the whole input, then you don't have any parallelism in the map phase, and there is no reason to use hadoop at all.

What is almost certainly happening here is that your input file gets broken up into two splits, and now your mapper no longer can find all key pairs corresponding to a given value, because you don't have all of the input records in your split. So for instance, consider breaking up the input file you've supplied into roughly two, one with all the "key1s", and one with the others. Running your map-reduce set locally on all of the input at once produces the output you'd expect:

$ cat input1 input2 | ./map.py | sort | ./reduce.py 
None    0
key1_key2   2
key1_key4   1
key2_key3   1

But the hadoop workflow is that different mappers get each input, and they're only combined in the shuffle/reduce phase:

$ cat input1 | ./map.py > output1
$ cat input2 | ./map.py > output2
$ cat output1 output2 | sort | ./reduce.py 
None    0
key2_key3   1

so now you're missing results. This is inevitable, because in any case that would make sense to use hadoop for, no individual mapper is going to see all of the data.

You'll need to refactor things, so that the map simply emits (value, key) pairs, and then the reducer gathers all of the keys together for a given value and then generates all key pairs with a count. Then another map-reduce step will have to do the count.

So you'd have a map1.py and a reduce1.py:

#!/usr/bin/env python 
# map1.py

import sys

for line in sys.stdin:  
    line = line.strip()
    key, val = line.strip().split("\t")
    print val, "\t", key

#!/usr/bin/env python
# reduce1.py

import sys

def emit_keypairs(keylist):
    for i in range(len(keylist)-1):
        for j in range(i+1,len(keylist)):
            key_pair = keylist[i]+"_"+keylist[j]
            print "{0}\t{1}".format(key_pair,"1")

current_word = None
current_keylist = []

for line in sys.stdin:
    line = line.strip()
    word, key = line.split('\t', 1)

    if current_word == word:
        current_keylist.append(key)
    else:
        if current_word:
            emit_keypairs(current_keylist)
        current_word = word
        current_keylist = [key]

# do not forget to output the last word if needed!
if current_word == word:
    emit_keypairs(current_keylist)

Run those, and then basically just run a wordcount on the output. This will be robust to splitting the input file:

$ cat input1 | ./map1.py > map1
$ cat input2 | ./map1.py > map2
$ cat map1 map2 | sort | ./reduce1.py 

key1_key2   1
key1_key2   1
key1_key4   1
key2_key3   1

and then another map-reduce phase with wordcount will produce the expected results.

Upvotes: 1

Related Questions