Prashant Pandey
Prashant Pandey

Reputation: 4662

Spark Structured Streaming Batch Read Checkpointing

I am fairly new to Spark and am still learning. One of the more difficult concepts I have come across is checkpointing and how Spark uses it to recover from failures. I am doing batch reads from Kafka using Structured Streaming and writing them to S3 as Parquet file as:

dataset
    .write()
    .mode(SaveMode.Append)
    .option("checkpointLocation", checkpointLocation)
    .partitionBy("date_hour")
    .parquet(getS3PathForTopic(topicName));

The checkpoint location is a S3 filesystem path. However, as the job runs, I see no checkpointing files. In subsequent runs, I see the following log:

21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1

This indicates that the previous run did not checkpoint any offsets for this run to pick them up from. So it keeps consuming from the earliest offset.

How can I make my job pick up new offsets? Note that this is a batch query as described here.

This is how I read:

             sparkSession
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
                .option("subscribe", topic)
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
                .option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
                .option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
                .option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("failOnDataLoss", "true");

Upvotes: 1

Views: 1434

Answers (1)

Ged
Ged

Reputation: 18108

I am not sure why batch Spark Structured Streaming with Kafka still exists now. If you wish to use it, then you must code your own Offset management. See the guide, but it is badly explained.

I would say Trigger.Once is a better use case for you; Offset management is provided by Spark as it is thus not batch mode.

Upvotes: 1

Related Questions