Reputation: 84
In a consumer poll loop in Kafka, when a SerializationException is thrown by the poll method, is there a way to just skip this message (aka a 'poison pill') and go on consuming the next event from the topic?
I could catch the exception and use the consumer.seek() method to move the offset to the next message, but that method requires the partition and the offset as input parameters. Is there a way to get those values?
I have example code in a github repos. To run the example:
$ git clone https://github.com/bjornhjelle/kafka-streams-examples-gradle.git
$ cd kafka-streams-examples-gradle
$ ./gradlew build -x test
$ ./gradlew test --tests no.test.SerializationExceptionExample
The example produces three events to Kafka. The second event causes the SerializationException. The exception is caught and logged. At this point I would like to move the offset past this event. Instead the thrown again in the poll loop. The third event is therefore not consumed, so the test fails.
I am aware of this open issue on the same topic, but it mentions Kafka client version < 0.10.0.1, while I use version 1.0.0: https://issues.apache.org/jira/browse/KAFKA-4740
I am also aware that I could probably solve it by using Kafka Streams and the new functionality for handling poison pills there (KIP-161: streams deserialization exception handlers)
What caused me to look into this in the first place was this exception: (the example code causes a different SerializationException since I was not able to re-create this one)
Exception in thread "SimpleAsyncTaskExecutor-3" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition apcitxpt-1 at offset 339798. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: java.nio.BufferUnderflowException
at java.nio.Buffer.nextGetIndex(Buffer.java:500)
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:135)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.getByteBuffer(AbstractKafkaAvroDeserializer.java:77)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:119)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:93)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1170)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at no.ruter.nextgen.kafkaConsumerRunners.ApcInitRunner.run(ApcInitRunner.java:63)
at java.lang.Thread.run(Thread.java:745)
Upvotes: 4
Views: 4352
Reputation: 21
The solutions I found are parsing the serialization exception message to get the required data. topic name, partition and offset:
catch (SerializationException se) {
String s = se.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topic = s.split("-")[0];
int offset = parseInt(s.split("offset ")[1]);
int partition = parseInt(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topic, partition);
logger.debug("Skipping {}-{} offset {}", topic, partition , offset);
consumer.seek(topicPartition, offset + 1L);}
Upvotes: 2