rosefun
rosefun

Reputation: 1867

Pyspark: how to reserve only an RDD for each key according to the value

When I join two rdds, each key may has multiple rdd, here I want to reserve only one rdd for each key. For exmaples.

a = sc.parallelize([('a','20200621'), ('b','20200701')])
b = sc.parallelize([('a',('20200520',10)), ('a', ('20200620',20)), ('b', ('20200620', 30))])
a.join(b).collect()

[('b', ('20200701', ('20200620', 30))),
 ('a', ('20200621', ('20200520', 10))),
 ('a', ('20200621', ('20200620', 20)))]

I only want to reserve the rdd which the two dates is most closest. Here, in key a, 20200621 and 20200620 is closer than 20200621 and 20200520, so I want to reserve ('a', ('20200621', ('20200620', 20))) instead of ('a', ('20200621', ('20200520', 10))).

The expected result is like:

[('b', ('20200701', ('20200620', 30))),
 ('a', ('20200621', ('20200620', 20)))]

Upvotes: 0

Views: 23

Answers (1)

rosefun
rosefun

Reputation: 1867

I solved it by using reduceByKey.

b.join(a).reduceByKey(lambda x,y : x if x[1][0][0] > y[1][0][0] else y).collect()

Output:

[('b', (('20200620', 30), '20200701')), ('a', (('20200620', 20), '20200621'))]

Upvotes: 0

Related Questions