Sailormoon
Sailormoon

Reputation: 267

reducebykey in pyspark with multiple key fields in tuple

from pyspark import SparkContext, SparkConf

import sys

 
conf = SparkConf().setAppName("test")

sc = SparkContext(conf=conf)

from operator import add

def convertion(num):

    return datetime.datetime.fromtimestamp(num).strftime('%Y-%m-%d')

def compute(strs, num):

    if strs == 'apple':

        return -num

    return num

rdd = sc.parallelize([

    {'user':'user','tpe':'apple','timstamp':1500000000,'amount':1},

    {'user':'user','tpe':'pear','timstamp':1500000001,'amount':2},

    {'user':'user2','tpe':'apple','timstamp':1505000002,'amount':3}

])

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd.reduceByKey(lambda x, y: x+y).take(3)

print(rdd.collect())

output is wrong: [(('user', '2017-07-13'), -1), (('user', '2017-07-13'), 2), (('user2', '2017-09-09'), -3)]

I want the output to be : [(('user', '2017-07-13'), 1), (('user2', '2017-09-09'), -3)]

I think I did not use reducebykey correctly, could someone enlighten me on how to group them based on key tuple?

Thank you!

Upvotes: 1

Views: 410

Answers (1)

werner
werner

Reputation: 14845

reduceByKey returns (like all Spark transformations) a new rdd. This new rdd is not assigned to a variable and therefore the transformation is not executed.

When calling rdd.collect() in the last line, the variable rdd still references the rdd created by rdd = rdd.map(...) and the contents after the map call are printed.

The result of reduceByKey should be assigned to a variable and the take(3) should be removed:

rdd = rdd.map(lambda x: ((x['user'],convertion(x['timstamp'])),compute(x['tpe'],x['amount'])))

rdd = rdd.reduceByKey(lambda x, y: x+y)

print(rdd.collect())

Upvotes: 3

Related Questions