Reputation: 41
I am using Spark 2.3 structured streaming to read messages from Kafka and write to Postgres (using the Scala programming language), my application is supposed to be a long living application, and it should be able to handle any case of failure without human intervention.
I have been looking for ways to catch unexpected errors in Structured Streaming, and I found this example here:
Spark Structured Streaming exception handling
This way it is possible to catch all errors that are thrown in the Stream, but the problem is, when the application tries again, it is stuck on the same exception again.
Is there a way in Structured Streaming that I can handle the error and tell spark to increment the offset in the "checkpointlocation" programatically so that it proceeds to the consume the next message without being stuck?
Upvotes: 3
Views: 4062
Reputation: 1821
This is called in the streaming event processing world as handling a "poison pill" Please have a look on the following link https://www.waitingforcode.com/apache-spark-structured-streaming/corrupted-records-poison-pill-records-apache-spark-structured-streaming/read
It suggest several ways to handle this type of scenario
Strategy 1: let it crash The Streaming application will log a poison pill message and stop the processing. It's not a big deal because thanks to the checkpointed offsets we'll be able to reprocess the data and handle it accordingly, maybe with a try-catch block. However, as you already saw in your question, it's not a good practice in streaming systems because the consumer stops and during that idle period it accumulates the lag (the producer continues to generate data).
Strategy 2: ignore errors If you don't want downtime of your consumer, you can simply skip the corrupted events. In Structured Streaming it can be summarized to filtering out null records and, eventually, logging the unparseable messages for further investigation, or records that get you an error.
Strategy 3: Dead Letter Queue we ignore the errors but instead of logging them, we dispatch them into another data storage.
Strategy 4: sentinel value You can use a pattern called Sentinel Value and it can be freely used with Dead Letter Queue. Sentinel Value corresponds to a unique value returned every time in case of trouble. So in your case, whenever a record cannot be converted to the structure we're processing, you will emit a common object,
For code samples look inside the link
Upvotes: 2