matsuninja
matsuninja

Reputation: 385

How to create collection of RDDs out of RDD?

I have an RDD[String], wordRDD. I also have a function that creates an RDD[String] from a string/word. I would like to create a new RDD for each string in wordRDD. Here are my attempts:

1) Failed because Spark does not support nested RDDs:

var newRDD = wordRDD.map( word => {
  // execute myFunction()
  (new MyClass(word)).myFunction()
})

2) Failed (possibly due to scope issue?):

var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
  newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}

My ideal result would look like:

// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)

// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')

// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)

I found a relevant question here: Spark when union a lot of RDD throws stack overflow error, but it didn't address my issue.

Upvotes: 6

Views: 5131

Answers (2)

Till Rohrmann
Till Rohrmann

Reputation: 13346

You cannot create a RDD from within another RDD.

However, it is possible to rewrite your function myFunction: String => RDD[String], which generates all words from the input where one letter is removed, into another function modifiedFunction: String => Seq[String] such that it can be used from within an RDD. That way, it will also be executed in parallel on your cluster. Having the modifiedFunction you can obtain the final RDD with all words by simply calling wordRDD.flatMap(modifiedFunction).

The crucial point is to use flatMap (to map and flatten the transformations):

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
  val sc = new SparkContext(sparkConf)

  val input = sc.parallelize(Seq("apple", "ananas", "banana"))

  // RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
  val result = input.flatMap(modifiedFunction) 
}

def modifiedFunction(word: String): Seq[String] = {
  word.indices map {
    index => word.substring(0, index) + word.substring(index+1)
  }
}

Upvotes: 3

Climbs_lika_Spyder
Climbs_lika_Spyder

Reputation: 6724

Use flatMap to get RDD[String] as you desire.

var allWords = wordRDD.flatMap { word => 
  (new MyClass(word)).myFunction().collect()
}

Upvotes: 3

Related Questions