Anadi Misra
Anadi Misra

Reputation: 2103

How to get RetryAdvice working for KafkaProducerMessageHandler

I'm trying to write RetryAdvice for Kafka handler; and fall back to saving to MongoDB as RecoveryCallback.

@Bean(name = "kafkaSuccessChannel")
public ExecutorChannel kafkaSuccessChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();

@Bean(name = "kafkaErrorChannel")
public ExecutorChannel kafkaErrorChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();

@ServiceActivator(inputChannel = "kafkaPublishChannel")
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler(
        @Autowired ExecutorChannel kafkaSuccessChannel,
        @Autowired RequestHandlerRetryAdvice retryAdvice) {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
            new SpelExpressionParser()
                            "headers['" + upstreamTypeHeader + "'] + '_' + headers['" + upstreamInstanceHeader + "']"));
    // sync true implies that this Kafka handler will wait for results of kafka operations; to be used only for testing purposes.
    return handler;

And then I configure the advice as follows in the same class

public RequestHandlerRetryAdvice retryAdvice(@Autowired RetryTemplate retryTemplate,
                                             @Autowired ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    return retryAdvice;

public RetryTemplate retryTemplate() {
    return new RetryTemplateBuilder().maxAttempts(3).exponentialBackoff(1000, 3.0, 30000)

and finally I have a Mongo handler that would save the failed message to a certain collection

@ServiceActivator(inputChannel = "kafkaErrorChannel")
public MongoDbStoringMessageHandler kafkaFailureHandler(@Autowired MongoDatabaseFactory mongoDbFactory,
                                                        @Autowired MongoConverter mongoConverter) {
    String collectionExpressionString = "headers['" + upstreamTypeHeader + "'] + '_'+ headers['" + upstreamInstanceHeader + "']+ '_FAIL'";
    return getMongoDbStoringMessageHandler(mongoDbFactory, mongoConverter, collectionExpressionString);

I'm having hard time figuring out have I wired all of this correct as the test never seems to work, in the test class I do not setup any embedded kafka or connect to a kafka so that message publishing would fail, expecting this to trigger the retry advice and eventually save to dead letters collections in mongo.

void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-INSTANCE-HEADER", "jira")


    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))

I read we have to setSync to the Kafka handler for it to wait for results of kafka operations, so I introduced

private boolean testMode;

to the Kafka Configuration; and in the test above I set it to true via the @TestPropertySource annotation:

@TestPropertySource(properties = {


I still cannot see any log out for Retry Advice executing or a failed message saved in Mongo. Another idea is to use Awaitility but I'm not sure as to what condition should I put in the until() method for it to work.


Added debug logs for Kafka, I noticed the producer gets into a loop trying to reconnect with Kafka in a separate thread

2021-03-25 10:56:02.640 DEBUG 66997 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Initiating connection to node localhost:9999 (id: -1 rack: null) using address localhost/
2021-03-25 10:56:02.641 DEBUG 66997 --- [dPoolExecutor-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Kafka producer started
2021-03-25 10:56:02.666 DEBUG 66997 --- [ad | producer-1]   : [Producer clientId=producer-1] Connection with localhost/ disconnected Connection refused
    at java.base/ Method) ~[na:na]
    at java.base/ ~[na:na]
    at java.base/ ~[na:na]
    at ~[kafka-clients-2.6.0.jar:na]
    at ~[kafka-clients-2.6.0.jar:na]
    at ~[kafka-clients-2.6.0.jar:na]
    at ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll( ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.runOnce( ~[kafka-clients-2.6.0.jar:na]
    at ~[kafka-clients-2.6.0.jar:na]
    at java.base/ ~[na:na]

while the test reaches the assertion and hence it fails

    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))

So it looks like the retry advice does not take over first the first two failures.

Update 2

Updated the configuration class to add property

private Integer productMaxBlockDurationMs;

and added the following line to kafkaTemplate configuration method

props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);

that fixed it.

Update 3

As Gary says we can skip having to add all these props etc completely; I removed the following method from my class

KafkaTemplate<String, String> kafkaTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));

and simply injected kafka config like so from properties thereby not having to write a Kafka template bean


Upvotes: 2

Views: 496

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

KafkaProducers block for 60 seconds by default before failing.

Try reducing the producer property.


Here's an example:

public class So66768745Application {

    public static void main(String[] args) {, args);

    IntegrationFlow flow(KafkaTemplate<String, String> template, RequestHandlerRetryAdvice retryAdvice) {
        return IntegrationFlows.from(Gate.class)
                            .topic("testTopic"), e -> e

    RequestHandlerRetryAdvice retryAdvice(QueueChannel channel) {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(channel));
        return advice;

    QueueChannel channel() {
        return new QueueChannel();


interface Gate {

    void sendToKafka(String out);

@TestPropertySource(properties = {
        "spring.kafka.bootstrap-servers: localhost:9999",
        " 500" })
class So66768745ApplicationTests {

    Gate gate;

    QueueChannel channel;

    void test() {
        Message<?> em =;

2021-03-23 15:16:13.908 ERROR 2668 --- [           main]    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/ could not be established. Broker may not be available.
2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9999 (id: -1 rack: null) disconnected
2021-03-23 15:16:14.415 ERROR 2668 --- [           main]    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.921 ERROR 2668 --- [           main]    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms., failedMessage=GenericMessage [payload=test, headers={replyChannel=nullChannel, errorChannel=, id=d8ce277a-3d9a-b0bc-c14b-80d63ca13858, timestamp=1616526973218}], headers={id=1a6c29d2-f8d8-adf0-7569-db7610b020ef, timestamp=1616526974921}]

Upvotes: 2

Related Questions