Reputation: 509
I have 2 RDDs formats, the first one is ((provider, currency), value)
where the key is (provider, currency)
and the second one (provider, value)
where the key is provider
.
What I want to do is transform an RDD A from the (provider, value)
format to ((provider, currency), value)
. I have an B ((provider, currency), value)
RDD where I'll take the keys. Then I'll use these keys to expand RDD A so that every value
from the (provider, value)
RDD will repeat itself for every currency
in the new ((provider, currency), value)
RDD.
How could this be done in an efficient way, without having to collect() the RDDs and loop through them?
For example:
An item from RDD A would be:
(1773570, 4135.7998046875)
then some of the keys from RDD B would be
[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]
the output RDD should be:
[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]
A possible solution would be:
def get_keys(rdd):
return rdd.map(lambda item: (item[0])).collect()
def canonicalize_keys(sc, feature, keys):
def transform(item, keys):
return [
((item[0], currency_pair), item[1])
for provider_id, currency_pair in keys
if provider_id == item[0]]
return sc.parallelize(feature
.map(lambda item: transform(item, keys))
.reduce(lambda a, b: a + b))
Here I get the keys from RDD B using get_keys
and then I use these keys transform RDD A. The problem here is that if I have a lot of currency_pairs I get OutOfMemoryErrors from the JVM.
Upvotes: 3
Views: 3402
Reputation: 3084
Try this:
Given Ardd = RDD[(provider, value)]
and Brdd = RDD[((provider, currency), value)]
, what you want to do is join Ardd
and Brdd
such that newRDD
would be of the form RDD[((provider, currency), value)]
. Where value
refers to the value found from Ardd
.
To do that, what we do is this:
One Line Solution:
newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))
With Step by Step Explanation:
Get keys from Brdd
: Brdd_keys = Brdd.map(lambda x: x[0])
. Output has the form: RDD[(provider, currency)]
Join Ardd and Brdd_keys: AB = Ardd.join(Brdd_keys)
. Output has the form: RDD[(provider, (value, currency))]
Map to final form: newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0]))
. Output now has the form RDD[((provider, currency), value)]
Upvotes: 4