Reputation: 267
from pyspark import SparkContext, SparkConf
import sys
conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
from operator import add
def convertion(num):
return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')
def compute(strs, num):
if strs == 'apple':
return -num
return num
rdd = sc.parallelize([
{'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},
{'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},
{'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}
])
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd.reduceByKey(lambda x, y: x+y).take(3)
print(rdd.collect())
output is wrong: [(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]
I want the output to be :
[(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]
I think I did not use reducebykey correctly, could someone enlighten me on how to group them based on key tuple?
Thank you!
Upvotes: 1
Views: 410
Reputation: 14845
reduceByKey returns (like all Spark transformations) a new rdd. This new rdd is not assigned to a variable and therefore the transformation is not executed.
When calling rdd.collect()
in the last line, the variable rdd
still references the rdd created by rdd = rdd.map(...)
and the contents after the map
call are printed.
The result of reduceByKey
should be assigned to a variable and the take(3)
should be removed:
rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))
rdd = rdd.reduceByKey(lambda x, y: x+y)
print(rdd.collect())
Upvotes: 3