Reputation: 2103
I'm trying to write RetryAdvice
for Kafka
handler; and fall back to saving to MongoDB as RecoveryCallback
.
@Bean(name = "kafkaSuccessChannel")
public ExecutorChannel kafkaSuccessChannel() {
return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}
@Bean(name = "kafkaErrorChannel")
public ExecutorChannel kafkaErrorChannel() {
return MessageChannels.executor("kafkaSuccessChannel", asyncExecutor()).get();
}
@Bean
@ServiceActivator(inputChannel = "kafkaPublishChannel")
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler(
@Autowired ExecutorChannel kafkaSuccessChannel,
@Autowired RequestHandlerRetryAdvice retryAdvice) {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamTypeHeader + "'] + '_' + headers['" + upstreamInstanceHeader + "']"));
handler.setSendSuccessChannel(kafkaSuccessChannel);
handler.setAdviceChain(Arrays.asList(retryAdvice));
// sync true implies that this Kafka handler will wait for results of kafka operations; to be used only for testing purposes.
handler.setSync(testMode);
return handler;
}
And then I configure the advice as follows in the same class
@Bean
public RequestHandlerRetryAdvice retryAdvice(@Autowired RetryTemplate retryTemplate,
@Autowired ExecutorChannel kafkaErrorChannel) {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
retryAdvice.setRetryTemplate(retryTemplate);
return retryAdvice;
}
@Bean
public RetryTemplate retryTemplate() {
return new RetryTemplateBuilder().maxAttempts(3).exponentialBackoff(1000, 3.0, 30000)
.retryOn(MessageHandlingException.class).build();
}
and finally I have a Mongo
handler that would save the failed message to a certain collection
@Bean
@ServiceActivator(inputChannel = "kafkaErrorChannel")
public MongoDbStoringMessageHandler kafkaFailureHandler(@Autowired MongoDatabaseFactory mongoDbFactory,
@Autowired MongoConverter mongoConverter) {
String collectionExpressionString = "headers['" + upstreamTypeHeader + "'] + '_'+ headers['" + upstreamInstanceHeader + "']+ '_FAIL'";
return getMongoDbStoringMessageHandler(mongoDbFactory, mongoConverter, collectionExpressionString);
}
I'm having hard time figuring out have I wired all of this correct as the test never seems to work, in the test class I do not setup any embedded kafka or connect to a kafka so that message publishing would fail, expecting this to trigger the retry advice and eventually save to dead letters collections in mongo.
@Test
void testFailedKafkaPublish() {
//Dummy message
Map<String, String> map = new HashMap<>();
map.put("key", "value");
// Publish Message
Message<Map<String, String>> message = MessageBuilder.withPayload(map)
.setHeader("X-UPSTREAM-TYPE", "alm")
.setHeader("X-INSTANCE-HEADER", "jira")
.build();
kafkaGateway.publish(message);
//assert successful message is saved in FAIL collection
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
.extracting("key")
.containsOnly("value");
}
I read we have to setSync
to the Kafka handler for it to wait for results of kafka operations, so I introduced
@Value("${digite.swiftalk.kafka.test-mode:false}")
private boolean testMode;
to the Kafka Configuration; and in the test above I set it to true via the @TestPropertySource
annotation:
@TestPropertySource(properties = {
"spring.main.banner-mode=off",
"spring.data.mongodb.database=swiftalk_db",
"spring.data.mongodb.port=29019",
"spring.data.mongodb.host=localhost",
"digite.swiftalk.kafka.test-mode=true",
})
I still cannot see any log out for Retry Advice executing or a failed message saved in Mongo. Another idea is to use Awaitility
but I'm not sure as to what condition should I put in the until()
method for it to work.
Update
Added debug logs for Kafka, I noticed the producer gets into a loop trying to reconnect with Kafka in a separate thread
2021-03-25 10:56:02.640 DEBUG 66997 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Initiating connection to node localhost:9999 (id: -1 rack: null) using address localhost/127.0.0.1
2021-03-25 10:56:02.641 DEBUG 66997 --- [dPoolExecutor-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Kafka producer started
2021-03-25 10:56:02.666 DEBUG 66997 --- [ad | producer-1] o.apache.kafka.common.network.Selector : [Producer clientId=producer-1] Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:660) ~[na:na]
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:875) ~[na:na]
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:219) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:530) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:485) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) ~[kafka-clients-2.6.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) ~[kafka-clients-2.6.0.jar:na]
at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
while the test reaches the assertion and hence it fails
//assert successful message is saved in FAIL collection
assertThat(mongoTemplate.findAll(DBObject.class, "alm_jira_FAIL"))
.extracting("key")
.containsOnly("value");
So it looks like the retry advice does not take over first the first two failures.
Update 2
Updated the configuration class to add property
@Value("${spring.kafka.producer.properties.max.block.ms:1000}")
private Integer productMaxBlockDurationMs;
and added the following line to kafkaTemplate
configuration method
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);
that fixed it.
Update 3
As Gary says we can skip having to add all these props etc completely; I removed the following method from my class
@Bean
KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, productMaxBlockDurationMs);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
and simply injected kafka config like so from properties thereby not having to write a Kafka template bean
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.properties.max.block.ms=1000
spring.kafka.producer.properties.enable.idempotence=true
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Upvotes: 2
Views: 496
Reputation: 174554
KafkaProducer
s block for 60 seconds by default before failing.
Try reducing the max.block.ms
producer property.
https://kafka.apache.org/documentation/#producerconfigs_max.block.ms
EDIT
Here's an example:
@SpringBootApplication
public class So66768745Application {
public static void main(String[] args) {
SpringApplication.run(So66768745Application.class, args);
}
@Bean
IntegrationFlow flow(KafkaTemplate<String, String> template, RequestHandlerRetryAdvice retryAdvice) {
return IntegrationFlows.from(Gate.class)
.handle(Kafka.outboundChannelAdapter(template)
.topic("testTopic"), e -> e
.advice(retryAdvice))
.get();
}
@Bean
RequestHandlerRetryAdvice retryAdvice(QueueChannel channel) {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRecoveryCallback(new ErrorMessageSendingRecoverer(channel));
return advice;
}
@Bean
QueueChannel channel() {
return new QueueChannel();
}
}
interface Gate {
void sendToKafka(String out);
}
@SpringBootTest
@TestPropertySource(properties = {
"spring.kafka.bootstrap-servers: localhost:9999",
"spring.kafka.producer.properties.max.block.ms: 500" })
class So66768745ApplicationTests {
@Autowired
Gate gate;
@Autowired
QueueChannel channel;
@Test
void test() {
this.gate.sendToKafka("test");
Message<?> em = this.channel.receive(60_000);
assertThat(em).isNotNull();
System.out.println(em);
}
}
2021-03-23 15:16:13.908 ERROR 2668 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:
org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.
2021-03-23 15:16:14.343 WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:9999) could not be established. Broker may not be available.
2021-03-23 15:16:14.343 WARN 2668 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1] Bootstrap broker localhost:9999 (id: -1 rack: null) disconnected
2021-03-23 15:16:14.415 ERROR 2668 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:
org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.
2021-03-23 15:16:14.921 ERROR 2668 --- [ main] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='test' to topic testTopic:
org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms.
ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic testTopic not present in metadata after 500 ms., failedMessage=GenericMessage [payload=test, headers={replyChannel=nullChannel, errorChannel=, id=d8ce277a-3d9a-b0bc-c14b-80d63ca13858, timestamp=1616526973218}], headers={id=1a6c29d2-f8d8-adf0-7569-db7610b020ef, timestamp=1616526974921}]
Upvotes: 2