Reputation: 1867
When I join two rdds, each key
may has multiple rdd
, here I want to reserve only one rdd for each key. For exmaples.
a = sc.parallelize([('a','20200621'), ('b','20200701')])
b = sc.parallelize([('a',('20200520',10)), ('a', ('20200620',20)), ('b', ('20200620', 30))])
a.join(b).collect()
[('b', ('20200701', ('20200620', 30))),
('a', ('20200621', ('20200520', 10))),
('a', ('20200621', ('20200620', 20)))]
I only want to reserve the rdd which the two dates is most closest. Here, in key a
, 20200621
and 20200620
is closer than 20200621
and 20200520
, so I want to reserve ('a', ('20200621', ('20200620', 20)))
instead of ('a', ('20200621', ('20200520', 10)))
.
The expected result is like:
[('b', ('20200701', ('20200620', 30))),
('a', ('20200621', ('20200620', 20)))]
Upvotes: 0
Views: 23
Reputation: 1867
I solved it by using reduceByKey
.
b.join(a).reduceByKey(lambda x,y : x if x[1][0][0] > y[1][0][0] else y).collect()
Output:
[('b', (('20200620', 30), '20200701')), ('a', (('20200620', 20), '20200621'))]
Upvotes: 0