Anadi Misra
Anadi Misra

Reputation: 2103

How to get RetryAdvice working for KafkaProducerMessageHandler

I'm trying to write RetryAdvice for Kafka handler; and fall back to saving to MongoDB as RecoveryCallback.

@Bean(name = "kafkaSuccessChannel")
public ExecutorChannel kafkaSuccessChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}

@Bean(name = "kafkaErrorChannel")
public ExecutorChannel kafkaErrorChannel() {
    return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}

@Bean
@ServiceActivator(inputChannel = "kafkaPublishChannel")
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler(
        @Autowired ExecutorChannel kafkaSuccessChannel,
        @Autowired RequestHandlerRetryAdvice retryAdvice) {
    KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
    handler.setHeaderMapper(mapper());
    handler.setLoggingEnabled(TRUE);
    handler.setTopicExpression(
            new SpelExpressionParser()
                    .parseExpression(
                            "headers['" + upstreamTypeHeader + "'] + '_' + headers['" + upstreamInstanceHeader + "']"));
    handler.setSendSuccessChannel(kafkaSuccessChannel);
    handler.setAdviceChain(Arrays.asList(retryAdvice));
    // sync true implies that this Kafka handler will wait for results of kafka operations; to be used only for testing purposes.
    handler.setSync(testMode);
    return handler;
}

And then I configure the advice as follows in the same class

@Bean
public RequestHandlerRetryAdvice retryAdvice(@Autowired RetryTemplate retryTemplate,
                                             @Autowired ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    retryAdvice.setRetryTemplate(retryTemplate);
    return retryAdvice;
}

@Bean
public RetryTemplate retryTemplate() {
    return new RetryTemplateBuilder().maxAttempts(3).exponentialBackoff(1000, 3.0, 30000)
            .retryOn(MessageHandlingException.class).build();
}

and finally I have a Mongo handler that would save the failed message to a certain collection

@Bean
@ServiceActivator(inputChannel = "kafkaErrorChannel")
public MongoDbStoringMessageHandler kafkaFailureHandler(@Autowired MongoDatabaseFactory mongoDbFactory,
                                                        @Autowired MongoConverter mongoConverter) {
    String collectionExpressionString = "headers['" + upstreamTypeHeader + "'] + '_'+ headers['" + upstreamInstanceHeader + "']+ '_FAIL'";
    return getMongoDbStoringMessageHandler(mongoDbFactory, mongoConverter, collectionExpressionString);
}

I'm having hard time figuring out have I wired all of this correct as the test never seems to work, in the test class I do not setup any embedded kafka or connect to a kafka so that message publishing would fail, expecting this to trigger the retry advice and eventually save to dead letters collections in mongo.

@Test
void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-INSTANCE-HEADER", "jira")
            .build();

    kafkaGateway.publish(message);

    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
            .extracting("key")
            .containsOnly("value");
}

I read we have to setSync to the Kafka handler for it to wait for results of kafka operations, so I introduced

@Value("${digite.swiftalk.kafka.test-mode:false}")
private boolean testMode;

to the Kafka Configuration; and in the test above I set it to true via the @TestPropertySource annotation:

@TestPropertySource(properties = {
        "spring.main.banner-mode=off",
        "spring.data.mongodb.database=swiftalk_db",
        "spring.data.mongodb.port=29019",
        "spring.data.mongodb.host=localhost",
        "digite.swiftalk.kafka.test-mode=true",

})

I still cannot see any log out for Retry Advice executing or a failed message saved in Mongo. Another idea is to use Awaitility but I'm not sure as to what condition should I put in the until() method for it to work.

Update

Added debug logs for Kafka, I noticed the producer gets into a loop trying to reconnect with Kafka in a separate thread

2021-03-25 10:56:02.640 DEBUG 66997 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Initiating connection to node localhost:9999 (id: -1 rack: null) using address localhost/127.0.0.1
2021-03-25 10:56:02.641 DEBUG 66997 --- [dPoolExecutor-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Kafka producer started
2021-03-25 10:56:02.666 DEBUG 66997 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected

java.net.ConnectException: Connection refused
    at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
    at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:660) ~[na:na]
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:875) ~[na:na]
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:219) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:530) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) ~[kafka-clients-2.6.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.6.0.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

while the test reaches the assertion and hence it fails

    //assert successful message is saved in FAIL collection
    assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
            .extracting("key")
            .containsOnly("value");

So it looks like the retry advice does not take over first the first two failures.

Update 2

Updated the configuration class to add property

@Value("${spring.kafka.producer.properties.max.block.ms:1000}")
private Integer productMaxBlockDurationMs;

and added the following line to kafkaTemplate configuration method

props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);

that fixed it.

Update 3

As Gary says we can skip having to add all these props etc completely; I removed the following method from my class

@Bean
KafkaTemplate<String, String> kafkaTemplate() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}

and simply injected kafka config like so from properties thereby not having to write a Kafka template bean

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.properties.max.block.ms=1000
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Upvotes: 2

Views: 496

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

KafkaProducers block for 60 seconds by default before failing.

Try reducing the max.block.ms producer property.

https://kafka.apache.org/documentation/#producerconfigs_max.block.ms

EDIT

Here's an example:

@SpringBootApplication
public class So66768745Application {

    public static void main(String[] args) {
        SpringApplication.run(So66768745Application.class, args);
    }

    @Bean
    IntegrationFlow flow(KafkaTemplate<String, String> template, RequestHandlerRetryAdvice retryAdvice) {
        return IntegrationFlows.from(Gate.class)
                .handle(Kafka.outboundChannelAdapter(template)
                            .topic("testTopic"), e -> e
                        .advice(retryAdvice))
                .get();
    }

    @Bean
    RequestHandlerRetryAdvice retryAdvice(QueueChannel channel) {
        RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
        advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(channel));
        return advice;
    }

    @Bean
    QueueChannel channel() {
        return new QueueChannel();
    }

}

interface Gate {

    void sendToKafka(String out);

}
@SpringBootTest
@TestPropertySource(properties = {
        "spring.kafka.bootstrap-servers: localhost:9999",
        "spring.kafka.producer.properties.max.block.ms: 500" })
class So66768745ApplicationTests {

    @Autowired
    Gate gate;

    @Autowired
    QueueChannel channel;

    @Test
    void test() {
        this.gate.sendToKafka("test");
        Message<?> em = this.channel.receive(60_000);
        assertThat(em).isNotNull();
        System.out.println(em);
    }

}
2021-03-23 15:16:13.908 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9999) could not be established. Broker may not be available.
2021-03-23 15:16:14.343  WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Bootstrap broker localhost:9999 (id: -1 rack: null) disconnected
2021-03-23 15:16:14.415 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

2021-03-23 15:16:14.921 ERROR 2668 --- [           main] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:

org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.

ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms., failedMessage=GenericMessage [payload=test, headers={replyChannel=nullChannel, errorChannel=, id=d8ce277a-3d9a-b0bc-c14b-80d63ca13858, timestamp=1616526973218}], headers={id=1a6c29d2-f8d8-adf0-7569-db7610b020ef, timestamp=1616526974921}]

Upvotes: 2

Related Questions