Ott Toomet
Ott Toomet

Reputation: 1956

checkpointing DataFrames in SparkR

I am looping over a number of csv data files using R/spark. About 1% of each file must be retained (filtered based on certain criteria) and merged with the next data file (I have used union/rbind). However, as the loop runs, the lineage of the data gets longer and longer as spark remembers all the previous datasets and filter()-s.

Is there a way to do checkpointing in spark R API? I have learned that spark 2.1 has checkpointing for DataFrames but this seems not to be made available from R.

Upvotes: 1

Views: 73

Answers (2)

Ott Toomet
Ott Toomet

Reputation: 1956

An incomplete solution/workaround is to collect() your dataframe into an R object, and later re-parallelize by createDataFrame(). This works well for small data but for larger datasets it become too slow and complains about too large tasks.

Upvotes: 0

glefait
glefait

Reputation: 1691

We got the same issue with Scala/GraphX on a quite large graph (few billions of data) and the search for connected components .

I'm not sure what is available in R for your specific version, but a usual workaround is to break the lineage by "saving" the data then reloading it. In our case, we break the lineage every 15 iterations:

def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
    val path = checkpointDir + "/iter-" + iterationCount
    saveGraph(g, path)
    g.unpersist()
    loadGraph(path, numPartitions)
}

Upvotes: 1

Related Questions