Reputation: 1793
I have a quite complex Apache PySpark pipeline which performs several transformations on a (very large) set of text files. The intended output of my pipeline are different stages of the pipeline. Which is the best way (i.e. more efficient but also more sparkling, in the sense of: more fitting the Spark programming model and style) to do this?
Right now, my code looks like the following:
# initialize the pipeline and perform the first set of transformations.
ctx = pyspark.SparkContext('local', 'MyPipeline')
rdd = ctx.textFile(...).map(...).map(...)
# first checkpoint: the `first_serialization` function serializes
# the data into properly formatted string.
rdd..map(first_serialization).saveAsTextFile("ckpt1")
# here, I have to read again from the previously saved checkpoint
# using a `first_deserialization` function that deserializes what has
# been serialized from the `firs_serialization` function. Then performs
# other transformations.
rdd = ctx.textFile("ckpt1").map(...).map(...)
and so on. I would like to get rid of the serialization methods and of the multiple save/read -- by the way, does it impact the efficiency? I assume yes.
Any hint? Thanks in advance.
Upvotes: 3
Views: 1569
Reputation: 5782
This seems obviously simple, because it is, but I would recommend writing the intermediate stages out while continuing to reuse the existing RDD (side bar: use datasets/dataframes instead of RDDs to get more performance) and continue processing, writing out intermediate results as you go.
There's no need to pay the penalty of reading from disk/network when you already have the data processed (ideally even cached!) for further usage.
Example using your own code:
# initialize the pipeline and perform the first set of transformations.
ctx = pyspark.SparkContext('local', 'MyPipeline')
rdd = ctx.textFile(...).map(...).map(...)
# first checkpoint: the `first_serialization` function serializes
# the data into properly formatted string.
string_rdd = rdd..map(first_serialization)
string_rdd.saveAsTextFile("ckpt1")
# reuse the existing RDD after writing out the intermediate results
rdd = rdd.map(...).map(...) # rdd here is the same variable we used to create the string_rdd results above. alternatively, you may want to use the string_rdd variable here instead of the original rdd variable.
Upvotes: 3