Reputation: 1549
In order to satisfy reliability of my service, I need to push all incoming messages, that could not be deserialized, into a dead-letter topic using kafka-smallrye
and quarkus
.
All the messages on the topic should be in avro format (but I could not be sure) with a define schema on a schema-registry.
I have set the configuration of my consumer in this way:
mp:
messaging:
incoming:
test-in:
connector: smallrye-kafka
group:
id: test-in-consumer-group
topic: events-topic
failure-strategy: dead-letter-queue
schema:
registry:
url: http://localhost:8081
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
specific:
avro:
reader: true
My consumer code:
@ApplicationScoped
public class Consumer {
@Incoming("test-in")
public CompletionStage<Void> store(KafkaRecord<Key,SpecificRecord> data ){
String schemaFullName = data.getPayload().getSchema().getFullName();
System.out.println(schemaFullName);
// other consumer code
return data.ack();
}
}
When the consumer can't deserialize a message, the consuming process is blocked instead move the message to the dead letter and continue. I suppose that a deserialization error not produce an nack
so the message could not be moved to the dead-letter.
There is a way to move not deserilizable message to the dead letter topic?
Upvotes: 4
Views: 2923
Reputation: 66
i resolved using DeserializationFailureHandler. You have to use the topic of "dead-letter-queue" as normal topic and send your failure pushing.
@ApplicationScoped
@Identifier("failure-dead-letter") // Set the name of the failure handler
public class MyDeserializationFailureHandler
implements DeserializationFailureHandler<CustomBean> { // Specify the expected type
private static final Logger LOGGER = Logger.getLogger(MyDeserializationFailureHandler.class);
@Inject
@Channel("dead-letter")
Emitter<DeadLetterBean> deadLetterBeanEmitter;
@Override
public CustomBean handleDeserializationFailure(String topic, boolean isKey, String deserializer, byte[] data,
Exception exception, Headers headers) {
LOGGER.error("ERROR: " + exception.getMessage());
deadLetterBeanEmitter.send(Message.of(new DeadLetterBean(topic, isKey, deserializer, data, exception))
.withAck(() -> {
// Called when the message is acked
LOGGER.error("SENT TO DEAD LETTER");
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
LOGGER.error("ERROR, NOT SENT DEAD LETTER");
return CompletableFuture.completedFuture(null);
}));
return null;
}
}
also register the new topic as publisher,and the deserialization-failure-handler
mp.messaging.outgoing.dead-letter.topic=dead-letter-topic-name
mp.messaging.outgoing.dead-letter.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.message-in.value-deserialization-failure-handler=failure-dead-letter
Upvotes: 5