Joe C
Joe C

Reputation: 15684

What can cause my Spark Streaming checkpoint to be incomplete?

I am playing around with the Spark Streaming API, and specifically testing the checkpointing feature. However, I am finding in certain circumstances the checkpoint being returned is not complete.

The following code is being run in local[2] mode (though I noticed a similar phenomenon when I run it distributed) against version 2.1.0 (compiled against Scala 2.11):

public static void main (String[] args) throws Exception {
    SparkConf spark = new SparkConf();

    createAppendablePrintStream().println(ZonedDateTime.now() + " Starting stream");
    String checkpoint = "/export/spark/checkpoint"; // NFS mounted directory
    JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpoint, () -> {
        JavaStreamingContext x = new JavaStreamingContext(spark, Durations.seconds(5));
        x.checkpoint(checkpoint);
        JavaDStream<String> lines = x.socketTextStream("192.168.8.130", 7777); // IP address of my local VM
        JavaPairDStream<String, Integer> stateByType = lines.mapToPair(line -> new Tuple2(line.split(" ")[0], line)).updateStateByKey((Function2<List<String>, Optional<Integer>, Optional<Integer>>) (values, state) -> Optional.of(state.orElse(0) + values.size()));
        stateByType.foreachRDD(rdd -> createAppendablePrintStream().println(ZonedDateTime.now() + " Current state: " + rdd.collectAsMap()));
        return x;
    });

    jssc.start();
    jssc.awaitTermination();
    createAppendablePrintStream().println(ZonedDateTime.now() + " Closing stream");
}

private static PrintStream createAppendablePrintStream() {
    try {
        return new PrintStream(new FileOutputStream("/tmp/result.txt", true));
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}

When I add a new key to this stream and shut down the driver right away, it does not appear to be restored as part of the checkpoint, as evidenced by the following log excerpt:

2016-12-29T16:53:33.185Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:35.086Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:53:40.288Z[Europe/London] Current state: {WARN:=2, ERROR:=1, INFO:=1}
2016-12-29T16:53:43.695Z[Europe/London] Closing stream
2016-12-29T16:53:53.100Z[Europe/London] Starting stream
2016-12-29T16:54:08.154Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:13.226Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.026Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:15.768Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.136Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:17.521Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:18.795Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:19.360Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:20.634Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}

(Note the ALERT entry was added post-startup to show that the INFO entry is never returned.)

However, when I allow the new key to remain part of the state for a second frame, it will be recovered from the checkpoint straight away, as demonstrated by this log excerpt:

2016-12-29T16:54:25.052Z[Europe/London] Current state: {WARN:=2, ERROR:=1}
2016-12-29T16:54:30.066Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:35.051Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}
2016-12-29T16:54:38.545Z[Europe/London] Closing stream
2016-12-29T16:54:47.306Z[Europe/London] Starting stream
2016-12-29T16:55:01.982Z[Europe/London] Current state: {WARN:=2, ERROR:=1, ALERT:=1}

Is there an explanation for this kind of incomplete state? Can this be addressed with a configuration change? Or do I need to file a bug report with the Spark folks?

Upvotes: 2

Views: 1017

Answers (2)

Kaidul
Kaidul

Reputation: 15875

To avoid this loss of past received data, Spark 1.2 introduced write ahead logs which save the received data to fault-tolerant storage.

SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true");

Otherwise some batch can be lost and malfunctioned on restoring from checkpoint.

Upvotes: 1

zsxwing
zsxwing

Reputation: 20826

I don't know how did you stop StreamingContext. However, for receiver based streams, you need to set spark.streaming.receiver.writeAheadLog.enable to true to enable write ahead logs. Otherwise, as you have already seen, the last batch may be lost as Spark Streaming cannot replay it.

See http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics for more details.

Upvotes: 2

Related Questions