Regenschein
Regenschein

Reputation: 1502

Restarting Spark Structured Streaming Job consumes Millions of Kafka messages and dies

We have a Spark Streaming Application running on Spark 2.3.3

Basically, it opens a Kafka Stream:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()

The kafka topic has 2 partitions. After that, there are some basic filtering operations, some Python UDFs and an explode() on a column, like:

   stream = apply_operations(kafka_stream)

where apply_operations does all the work on the data. In the end, we would like to write the stream to a sink, i. e.:

   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()

To let this stream operation run forever, we apply:

   spark.streams.awaitAnyTermination()

In the end.

So far, so good. Everything runs for days. But due to a network problem, the job died for a few days, and there are now millions of messages in the kafka stream waiting to be catched up.

When we restart the streaming data job using spark-submit, the first batch will be too large and will take ages to be completed. We thought there might be a way to limit the size of the first batch with some parameter, but we did not find anything that helped.

We tried:

Now, after we restart the pipeline, sooner or later the whole Spark Job will crash again. We cannot simply widen the memory or the cores to be used for the spark job.

Is there anything we missed to control the amount of events beeing processed in one stream-batch?

Upvotes: 10

Views: 1687

Answers (1)

cronoik
cronoik

Reputation: 19365

You wrote in the comments that you are using spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation.

Upvotes: 6

Related Questions