Chris Nielsen
Chris Nielsen

Reputation: 849

How to get the Reducer to emit according to key type

As a follow-up to this question, I have a Mapper that is going through lots of data and emitting ID numbers as keys with the value of 1. Every key has two parts, separated by a pipe delimiter, for example:

Mapper emits:
a|abc 1
b|efg 1
a|cba 1
a|abc 1
b|dhh 1
b|dhh 1

What I am trying to do is have the Reducer parse the keys and for every key that is of type 'a', i.e. 'a|abc', I want the Reducer to only emit duplicates, but for every other type (for example type 'b', i.e. 'b|abc'), I want the Reducer to emit everything, even if the value is only 1.

So the above data would yield:
a|abc 2
b|efg 1
b|dhh 2

In this case, 'a|cba 1' wouldn't be emitted because it is a key of type 'a' and it does not have a duplicate. Below is the code I have tried, and it works almost as expected except that I get 92 extra emits where the key is of type 'a' and the count is 1. Note: 92 is the number of Reduce tasks according to my MapReduce log.

Since I only want duplicates for key type 'a', how can I fix the Reducer so that I don't get those extra 92 emits of key type 'a' with a value of 1?

import sys
import codecs

sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)

(last_key, tot_cnt) = (None, 0)
for line in inData:
    (key, val) = line.strip().split("\t")
    if last_key != key:
        k = key.split('|')
        v_id = k[0]
        if v_id == 'a':
            if tot_cnt > 1:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
        else:
            sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))

        (last_key, tot_cnt) = (key, int(val))
    else:
        (last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
    else:
        sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))

Upvotes: 1

Views: 241

Answers (1)

Manjunath Ballur
Manjunath Ballur

Reputation: 6343

Following are the bugs in your code:

  1. Declare v_id at global level, so that it is visible everywhere.

    Change this line:

    (last_key, tot_cnt) = (None, 0)
    

    To:

    (last_key, tot_cnt, v_id) = (None, 0, None)
    
  2. Following split should be on last_key and not current key. When the current key is "b|dhh" and last key is "a|abc", you should get v_id for "a|abc".

    Change this code:

    if last_key != key:
        k = key.split('|')
        v_id = k[0]
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    else:
        sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    

    To:

    if last_key != key:
        if last_key != None:
            k = last_key.split('|')
            v_id = k[0]
    
            if v_id == 'a':
                if tot_cnt > 1:
                    sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
            else:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
    

So, modified reducer code looks like below:

import sys
import codecs

sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)

(last_key, tot_cnt, v_id) = (None, 0, None)

for line in inData:
    (key, val) = line.strip().split("\t")
    if last_key != key:
        if last_key != None:
            k = last_key.split('|')
            v_id = k[0]

            if v_id == 'a':
                if tot_cnt > 1:
                    sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
            else:
                sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))

        (last_key, tot_cnt) = (key, int(val))
    else:
        (last_key, tot_cnt) = (key, tot_cnt + int(val))

if last_key:
    if v_id == 'a':
        if tot_cnt > 1:
            sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt)) 
    else:
        sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))

When I ran this, I got the output:

a|abc   2
b|dhh   2
b|efg   1

Note: I am not a Python expert. I feel, you can optimize this code. So, check if there are any corner cases and redundant checks in the script.

Upvotes: 1

Related Questions