Reputation: 1956
I am looping over a number of csv data files using R/spark. About 1% of each file must be retained (filtered based on certain criteria) and merged with the next data file (I have used union
/rbind
). However, as the loop runs, the lineage of the data gets longer and longer as spark remembers all the previous datasets and filter()
-s.
Is there a way to do checkpointing in spark R API? I have learned that spark 2.1 has checkpointing for DataFrames but this seems not to be made available from R.
Upvotes: 1
Views: 73
Reputation: 1956
An incomplete solution/workaround is to collect()
your dataframe into an R object, and later re-parallelize by createDataFrame()
. This works well for small data but for larger datasets it become too slow and complains about too large tasks.
Upvotes: 0
Reputation: 1691
We got the same issue with Scala/GraphX on a quite large graph (few billions of data) and the search for connected components .
I'm not sure what is available in R for your specific version, but a usual workaround is to break the lineage by "saving" the data then reloading it. In our case, we break the lineage every 15 iterations:
def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
val path = checkpointDir + "/iter-" + iterationCount
saveGraph(g, path)
g.unpersist()
loadGraph(path, numPartitions)
}
Upvotes: 1