Reputation: 849
As a follow-up to this question, I have a Mapper that is going through lots of data and emitting ID numbers as keys with the value of 1. Every key has two parts, separated by a pipe delimiter, for example:
Mapper emits:
a|abc 1
b|efg 1
a|cba 1
a|abc 1
b|dhh 1
b|dhh 1
What I am trying to do is have the Reducer parse the keys and for every key that is of type 'a', i.e. 'a|abc', I want the Reducer to only emit duplicates, but for every other type (for example type 'b', i.e. 'b|abc'), I want the Reducer to emit everything, even if the value is only 1.
So the above data would yield:
a|abc 2
b|efg 1
b|dhh 2
In this case, 'a|cba 1' wouldn't be emitted because it is a key of type 'a' and it does not have a duplicate. Below is the code I have tried, and it works almost as expected except that I get 92 extra emits where the key is of type 'a' and the count is 1. Note: 92 is the number of Reduce tasks according to my MapReduce log.
Since I only want duplicates for key type 'a', how can I fix the Reducer so that I don't get those extra 92 emits of key type 'a' with a value of 1?
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt) = (None, 0)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
k = key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
Upvotes: 1
Views: 241
Reputation: 6343
Following are the bugs in your code:
Declare v_id
at global level, so that it is visible everywhere.
Change this line:
(last_key, tot_cnt) = (None, 0)
To:
(last_key, tot_cnt, v_id) = (None, 0, None)
Following split should be on last_key
and not current key
. When the current key is "b|dhh" and last key is "a|abc", you should get v_id
for "a|abc".
Change this code:
if last_key != key:
k = key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
To:
if last_key != key:
if last_key != None:
k = last_key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
So, modified reducer code looks like below:
import sys
import codecs
sys.stdout = codecs.getwriter('utf-8')(sys.stdout)
inData = codecs.getreader('utf-8')(sys.stdin)
(last_key, tot_cnt, v_id) = (None, 0, None)
for line in inData:
(key, val) = line.strip().split("\t")
if last_key != key:
if last_key != None:
k = last_key.split('|')
v_id = k[0]
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key,tot_cnt))
(last_key, tot_cnt) = (key, int(val))
else:
(last_key, tot_cnt) = (key, tot_cnt + int(val))
if last_key:
if v_id == 'a':
if tot_cnt > 1:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
else:
sys.stdout.write("%s\t%s\n" % (last_key, tot_cnt))
When I ran this, I got the output:
a|abc 2
b|dhh 2
b|efg 1
Note: I am not a Python expert. I feel, you can optimize this code. So, check if there are any corner cases and redundant checks in the script.
Upvotes: 1