Reputation: 84
In a consumer poll loop in Kafka, when a SerializationException is thrown by the poll method, is there a way to just skip this message (aka a 'poison pill') and go on consuming the next event from the topic?
I could catch the exception and use the method to move the offset to the next message, but that method requires the partition and the offset as input parameters. Is there a way to get those values?
I have example code in a github repos. To run the example:
$ git clone
$ cd kafka-streams-examples-gradle
$ ./gradlew build -x test
$ ./gradlew test --tests no.test.SerializationExceptionExample
The example produces three events to Kafka. The second event causes the SerializationException. The exception is caught and logged. At this point I would like to move the offset past this event. Instead the thrown again in the poll loop. The third event is therefore not consumed, so the test fails.
I am aware of this open issue on the same topic, but it mentions Kafka client version <, while I use version 1.0.0:
I am also aware that I could probably solve it by using Kafka Streams and the new functionality for handling poison pills there (KIP-161: streams deserialization exception handlers)
What caused me to look into this in the first place was this exception: (the example code causes a different SerializationException since I was not able to re-create this one)
Exception in thread "SimpleAsyncTaskExecutor-3" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition apcitxpt-1 at offset 339798. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(
at java.nio.HeapByteBuffer.get(
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getByteBuffer(
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
Upvotes: 4
Views: 4352
Reputation: 21
The solutions I found are parsing the serialization exception message to get the required data. topic name, partition and offset:
catch (SerializationException se) {
String s = se.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topic = s.split("-")[0];
int offset = parseInt(s.split("offset ")[1]);
int partition = parseInt(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topic, partition);
logger.debug("Skipping {}-{} offset {}", topic, partition , offset);, offset + 1L);}
Upvotes: 2