Reputation: 385
I have an RDD[String]
, wordRDD
. I also have a function that creates an RDD[String] from a string/word. I would like to create a new RDD for each string in wordRDD
. Here are my attempts:
1) Failed because Spark does not support nested RDDs:
var newRDD = wordRDD.map( word => {
// execute myFunction()
(new MyClass(word)).myFunction()
})
2) Failed (possibly due to scope issue?):
var newRDD = sc.parallelize(new Array[String](0))
val wordArray = wordRDD.collect
for (w <- wordArray){
newRDD = sc.union(newRDD,(new MyClass(w)).myFunction())
}
My ideal result would look like:
// input RDD (wordRDD)
wordRDD: org.apache.spark.rdd.RDD[String] = ('apple','banana','orange'...)
// myFunction behavior
new MyClass('apple').myFunction(): RDD[String] = ('pple','aple'...'appl')
// after executing myFunction() on each word in wordRDD:
newRDD: RDD[String] = ('pple','aple',...,'anana','bnana','baana',...)
I found a relevant question here: Spark when union a lot of RDD throws stack overflow error, but it didn't address my issue.
Upvotes: 6
Views: 5131
Reputation: 13346
You cannot create a RDD
from within another RDD
.
However, it is possible to rewrite your function myFunction: String => RDD[String]
, which generates all words from the input where one letter is removed, into another function modifiedFunction: String => Seq[String]
such that it can be used from within an RDD. That way, it will also be executed in parallel on your cluster. Having the modifiedFunction
you can obtain the final RDD
with all words by simply calling wordRDD.flatMap(modifiedFunction)
.
The crucial point is to use flatMap
(to map
and flatten
the transformations):
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val input = sc.parallelize(Seq("apple", "ananas", "banana"))
// RDD("pple", "aple", ..., "nanas", ..., "anana", "bnana", ...)
val result = input.flatMap(modifiedFunction)
}
def modifiedFunction(word: String): Seq[String] = {
word.indices map {
index => word.substring(0, index) + word.substring(index+1)
}
}
Upvotes: 3
Reputation: 6724
Use flatMap
to get RDD[String]
as you desire.
var allWords = wordRDD.flatMap { word =>
(new MyClass(word)).myFunction().collect()
}
Upvotes: 3