Reputation: 483
I have DocsRDD : RDD[String, String]
val DocsRDD = sc.wholeTextFiles("myDirectory/*" , 2)
DocsRDD:
Doc1.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc2.txt , bla bla bla .....bla \n bla bla \n bla ... bla
Doc3.txt , bla bla bla .....\n bla bla bla \n bla ... bla
Doc4.txt , bla bla \n .....\n bla bla bla bla \n ... bla
Is there an efficient, elegant way to extract n-grams from these with mapPartitions? So far i have tried everything, i have read everything i could find at least 5 times over and over about mapPartitions but i still cannot understand how to use it! It seems waaay too difficult to manipulate. In short i want :
val NGramsRDD = DocsRDD.map(x => (x._1 , x._2.sliding(n) ) )
but efficiently with mapPartitions. My basic misunderstanding of mapPartitions is :
OneDocRDD : RDD[String]
val OneDocRDD = sc.textFile("myDoc1.txt" , 2)
.mapPartitions(s1 : Iterator[String] => s2 : Iterator[String])
I Cannot understand this! From when s1 was Iterator[String]? s1 is String after sc.textfile.
Alright my second question is : Will mapPartitions improve my overcome against map in this situation?
Last but not Least important: can f() be :
f(Iterator[String]) : Iterator[Something else?]
Upvotes: 3
Views: 17843
Reputation: 1317
I'm not sure that .mapPartitions will help (at least, not given the example), but using .mapPartitions would look like:
val OneDocRDD = sc.textFile("myDoc1.txt", 2)
.mapPartitions(iter => {
// here you can initialize objects that you would need
// that you want to create once by worker and not for each x in the map.
iter.map(x => (x._1 , x._2.sliding(n)))
})
Normally you want to use .mapPartitions to create/initialize an object you don't want (example: too big) or can't serialize to the worker nodes. Without .mapPartitions you would need to create them in the .map, but that would not be efficient since the object would be created for each x.
Upvotes: 9