Rahul
Rahul

Reputation: 115

How to create RDD inside map function

I have RDD of key/value pair and for each key i need to call some function which accept RDD. So I tried RDD.Map and inside map created RDD using sc.parallelize(value) method and send this rdd to my function but as Spark does not support to create RDD within RDD this is not working.

Can you please suggest me any solution for this situation ?

I am looking for solution as suggest in below thread but problem i am having is my keys are not fixed and i can have any number of keys.
How to create RDD from within Task?

Thanks

Upvotes: 0

Views: 3491

Answers (2)

koders
koders

Reputation: 5812

It doesn't sound quite right. If the function needs to process the key value pair, it should receive the pair as the parameter, not RDD.

But if you really want to send the RDD as a parameter, instead of inside the chain operation, you may create a reference after preprocessing and send that reference to the method.

Upvotes: 1

Mo Tao
Mo Tao

Reputation: 1295

No, you shouldn't create RDD inside RDD.

Depends on the size of your data, there could be two solutions:

1) If there are many keys and each key has not too much values. Turn the function which accepts RDD to a function which accepts Iterable. Then you can do some thing like

// rdd: RDD[(keyType, valueType)]
rdd.groupByKey()
  .map { case (key, values) =>
    func(values)
  }

2) If there are few keys and each key has many values. Then you should not do a group as it would collect all values for a key to an executor, which may cause OutOfMemory. Instead, run a job for each key like

rdd.keys.distinct().collect()
  .foreach { key =>
    func(rdd.filter(_._1 == key))         
  }

Upvotes: 0

Related Questions