Shay
Shay

Reputation: 505

Spark streaming with kafka - restarting from checkpoint

We are building a fault tolerant system using Spark Streaming and Kafka and are testing checkpointing spark streaming to give us the option of restarting the spark job if it crashes for any reason. Here's what our spark process looks like:

What we want to achieve is a setup where we can bring down the spark streaming job (to mimic a failure) and then restart it and still ensure that we process every message from Kafka. This seems to work fine but, here is what I see that I don't know what to make of:

Any inputs on this will be appreciated:

enter image description here

Upvotes: 2

Views: 2524

Answers (2)

Yulin GUO
Yulin GUO

Reputation: 159

Is this expected? Why are batches being created when they don't process any data

In fact, Spark streaming with Kafka, when recovering from checkpoint, spark first generate jobs. All data are processed in one or more batches(it depends on some conf), while in web ui, you could only see all those recovered batches executed with 0 event.

There is also a second thing which is confusing ...

Yeah, from web ui, that's confusing. Try to count events num for each batch, println the num.

DStream.foreachRDD(println(_.count))

You'll find that Spark do really process batches created by checkpoint, while in web ui, events num 0.

If your application finds it difficult to process all events in one batch after recovering from failure ,how to control the num of batches created by spark?

Try to search spark.streaming.kafka.maxRatePerPartition => Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the new Kafka direct stream API.

MaxRatePerPartition* partitionOfKafka* YourStreamingBatchDuration * N = eventsNumToProcess

N => After recovered from checkpoint, the num of batches spark need to process.

Upvotes: 0

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149618

Is this expected? Why are batches being created when they don't process any data

That's what Sparks fault tolerance semantics guarantee, that even if your service fails, it can pick up from the last processed point in time and continue processing. Spark is reading the checkpointed data and is initiating the recovery process until it reaches the current point in time. Spark isn't aware of 0 event batches, and thus does nothing to optimize them away.

It looks like that these messages are being processed but I don't see any batch on the UI that has an input size of 4800

This may happen for various reasons. A common one is if you have Sparks back pressure flag set to true. Spark sees that you have a significant processing delay, so it reduces the number of messages read per batch in order to allow the streaming job to catch up.

Upvotes: 1

Related Questions