Reputation: 513
Given this example;
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}
val oneRDD = someRDD.reduceByKey(_+_)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
Which would be better to do?
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}.persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
val oneRDD = someRDD.reduceByKey(_+_)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
OR
val someRDD = firstRDD.flatMap{ case(x,y) => SomeFunc(y)}.persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
val oneRDD = someRDD.reduceByKey(_+_).persist(storage.StorageLevel.MEMORY_AND_DISK_SER)
oneRDD.saveAsNewAPIHadoopFile("dir/to/write/to", classOf[Text], classOf[Text], classOf[TextOutputFormat[Text, Text]])
or something else?
I see that it is good to persist when you are performing more than one action on the same RDD.
example being;
val newRDD = context.parallelize(0 until numMappers, numPartitions).persist(storage.StorageLevel.MEMORY_AND_DISK_SER) #persisted bc there are two follow on actions preformed on it.
newRDD.count() #same RDD
newRDD.saveAsNewAPIHadoopFile() #same RDD
...other actions etc.
Here it is only one RDD and two actions in line. Should I persist as all.
Upvotes: 0
Views: 1249
Reputation: 823
From Spark documentation:
Spark also automatically persists some intermediate data in shuffle operations (e.g.
reduceByKey
), even without users callingpersist
. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist on the resulting RDD if they plan to reuse it.
(I added bold around the above statement)
Note that chaining transformations is fine. The performance problem would occur when when reusing an RDD
Upvotes: 1