Reputation: 137
Hi I am trying to run a long sparkjob that often fails due to StackoverflowError. The job reads a parquetfile and creates an rdd in a foreach loop. After doing some research I thought creating a checkpoint for each rdd would help me solve my memory issues. (I have tried different memory, overhead memory, paralellism, repartition and found the most working settings for the job, however sometimes it still fails depending on the load on our cluster.)
Now to my real issue. I am trying to create checkpoints, by first reading in the parquet creating an RDD, then caching it, running checkpoint function and then calling the action first to make the checkpoint happen. No checkpoints are created in the path that I have specified and it the YARN UI it says that stage is skipped. Can anyone help me understand the problem :)
ctx.getSparkContext().setCheckpointDir("/tmp/checkpoints");
public static void writeHhidToCouchbase(DataFrameContext ctx, List<String> filePathsStrings) {
filePathsStrings
.forEach(filePath -> {
JavaPairRDD<String, String> rdd =
UidHhidPerDay.getParquetFromPath(ctx, filePath);
rdd.cache();
rdd.checkpoint();
rdd.first();
rdd.foreachPartition(p -> {
CrumbsClient client = getClient();
p.forEachRemaining(uids -> {
Crumbs crumbs = client.getAsync(uids._1)
.timeout(10, TimeUnit.SECONDS)
.toBlocking()
.first();
String hHid = uids._2;
if (hHid != null) {
crumbs.getOrCreateSingletonCrumb(HouseholdCrumb.class).setHouseholdId(hHid);
client.putSync(crumbs);
}
});
client.shutdown();
});
});
}
The checkpoint is created once in the first iteration but never again. KR
Upvotes: 3
Views: 1593
Reputation: 137
My mistake the partitions are actually created. The "first" partition that I mentioned above is a directory with the partitions inside. Because of a directory name like 8f987639-d5c8-46b8-a1e0-37081f9f8e00 I became confused. However looking into the lineage comment from @ImDarrenG gave me some more insights. I created a new repartitioned RDD from the first one that I cache and checkpoint. This gave made the application more stabile with no failures.
JavaPairRDD<String, String> rdd =
UidHhidPerDay.getParquetFromPath(ctx, filePath);
rdd.cache();
rdd.checkpoint();
rdd.first();
JavaPairRDD<String, String> rddToCompute = rdd.repartition(72);
rddToCompute.foreachPartition...
Upvotes: 1