George Lydakis
George Lydakis

Reputation: 509

How to efficiently add a new key to an RDD in pyspark

I have 2 RDDs formats, the first one is ((provider, currency), value) where the key is (provider, currency) and the second one (provider, value) where the key is provider.

What I want to do is transform an RDD A from the (provider, value) format to ((provider, currency), value). I have an B ((provider, currency), value) RDD where I'll take the keys. Then I'll use these keys to expand RDD A so that every value from the (provider, value) RDD will repeat itself for every currency in the new ((provider, currency), value) RDD.

How could this be done in an efficient way, without having to collect() the RDDs and loop through them?

For example:

An item from RDD A would be:

(1773570, 4135.7998046875)

then some of the keys from RDD B would be

[(1773570, 'EUR/USD'), (1773570, 'GBP/USD'), (1773570, 'USD/CAD')]

the output RDD should be:

[((1773570, 'EUR/USD'), 4135.7998046875), ((1773570, 'GBP/USD'), 4135.7998046875), ((1773570, 'USD/CAD'), 4135.7998046875)]

A possible solution would be:

def get_keys(rdd):
    return rdd.map(lambda item: (item[0])).collect()

def canonicalize_keys(sc, feature, keys):
    def transform(item, keys):
        return [
            ((item[0], currency_pair), item[1])
                for provider_id, currency_pair in keys
                    if provider_id == item[0]]
    return sc.parallelize(feature
        .map(lambda item: transform(item, keys))
        .reduce(lambda a, b: a + b))

Here I get the keys from RDD B using get_keys and then I use these keys transform RDD A. The problem here is that if I have a lot of currency_pairs I get OutOfMemoryErrors from the JVM.

Upvotes: 3

Views: 3402

Answers (1)

jtitusj
jtitusj

Reputation: 3084

Try this: Given Ardd = RDD[(provider, value)] and Brdd = RDD[((provider, currency), value)], what you want to do is join Ardd and Brdd such that newRDD would be of the form RDD[((provider, currency), value)]. Where value refers to the value found from Ardd.

To do that, what we do is this:

One Line Solution:

newRDD = Ardd.join(Brdd.map(lambda x: x[0])).map(lambda x: ((x[0], x[1][1]), x[1][0]))

With Step by Step Explanation:

  1. Get keys from Brdd: Brdd_keys = Brdd.map(lambda x: x[0]). Output has the form: RDD[(provider, currency)]

  2. Join Ardd and Brdd_keys: AB = Ardd.join(Brdd_keys). Output has the form: RDD[(provider, (value, currency))]

  3. Map to final form: newRDD = AB.map(lambda x: ((x[0], x[1][1]), x[1][0])). Output now has the form RDD[((provider, currency), value)]

Upvotes: 4

Related Questions