Reputation: 1
Just trying to find out a simple example with spring-kafka 2.2 that works with a KafkaListener, to retry last failed message. If a message fails, the message should be redirected to another Topic where the retries attempts will be made. We will have 4 topics. topic, retryTopic, sucessTopic and errorTopic If topic fails, should be redirected to retryTopic where the 3 attempts to retry will be made. If those attempts fails, must redirect to errorTopic. In case of sucess on both topic and retryTopic, should be redirected to the sucessTopic.
Upvotes: 0
Views: 3713
Reputation: 174554
It's a little simpler with Spring Boot 2.2.4 and Spring for Apache Kafka 2.3.5:
(2.2.x shown below).
@SpringBootApplication
public class So60172304Application {
public static void main(String[] args) {
SpringApplication.run(So60172304Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic").partitions(1).replicas(1).build();
}
@Bean
public NewTopic retryTopic() {
return TopicBuilder.name("retryTopic").partitions(1).replicas(1).build();
}
@Bean
public NewTopic successTopic() {
return TopicBuilder.name("successTopic").partitions(1).replicas(1).build();
}
@Bean
public NewTopic errorTopic() {
return TopicBuilder.name("errorTopic").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic", "failAlways");
template.send("topic", "onlyFailFirst");
template.send("topic", "good");
};
}
/*
* A custom container factory is needed until 2.3.6 is released because the
* container customizer was not applied before then.
*/
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
KafkaListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
customizer(template).configure(instance);
}
};
configurer.configure(factory, kafkaConsumerFactory);
// factory.setContainerCustomizer(customizer(template)); // after 2.3.6
return factory;
}
private ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>
customizer(KafkaTemplate<Object, Object> template) {
return container -> {
if (container.getContainerProperties().getTopics()[0].equals("topic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
new FixedBackOff(0L, 0L)));
}
else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
new FixedBackOff(5000L, 2L)));
}
};
}
}
@Component
class Listener {
private final KafkaTemplate<String, String> template;
public Listener(KafkaTemplate<String, String> template) {
this.template = template;
}
@KafkaListener(id = "so60172304.1", topics = "topic")
public void listen1(String in) {
System.out.println("topic: " + in);
if (in.toLowerCase().contains("fail")) {
throw new RuntimeException(in);
}
this.template.send("successTopic", in);
}
@KafkaListener(id = "so60172304.2", topics = "retryTopic")
public void listen2(String in) {
System.out.println("retryTopic: " + in);
if (in.startsWith("fail")) {
throw new RuntimeException(in);
}
this.template.send("successTopic", in);
}
@KafkaListener(id = "so60172304.3", topics = "successTopic")
public void listen3(String in) {
System.out.println("successTopic: " + in);
}
@KafkaListener(id = "so60172304.4", topics = "errorTopic")
public void listen4(String in) {
System.out.println("errorTopic: " + in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
result:
topic: failAlways
retryTopic: failAlways
topic: onlyFailFirst
topic: good
successTopic: good
retryTopic: failAlways
retryTopic: failAlways
retryTopic: onlyFailFirst
errorTopic: failAlways
successTopic: onlyFailFirst
With Spring Boot 2.1.12 and Spring for Apache Kafka 2.2.12:
@SpringBootApplication
public class So601723041Application {
public static void main(String[] args) {
SpringApplication.run(So601723041Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("topic", 1, (short) 1);
}
@Bean
public NewTopic retryTopic() {
return new NewTopic("retryTopic", 1, (short) 1);
}
@Bean
public NewTopic successTopic() {
return new NewTopic("successTopic", 1, (short) 1);
}
@Bean
public NewTopic errorTopic() {
return new NewTopic("errorTopic", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic", "failAlways");
template.send("topic", "onlyFailFirst");
template.send("topic", "good");
};
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
KafkaListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
customize(instance, template);
}
};
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> retryKafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<Object, Object>() {
@Override
protected void initializeContainer(ConcurrentMessageListenerContainer<Object, Object> instance,
KafkaListenerEndpoint endpoint) {
super.initializeContainer(instance, endpoint);
customize(instance, template);
}
};
configurer.configure(factory, kafkaConsumerFactory);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(5000L);
retryTemplate.setBackOffPolicy(backOffPolicy);
factory.setRetryTemplate(retryTemplate);
return factory;
}
private void customize(ConcurrentMessageListenerContainer<Object, Object> container,
KafkaTemplate<Object, Object> template) {
if (container.getContainerProperties().getTopics()[0].equals("topic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("retryTopic", cr.partition())),
0));
}
else if (container.getContainerProperties().getTopics()[0].equals("retryTopic")) {
container.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template,
(cr, ex) -> new TopicPartition("errorTopic", cr.partition())),
0)); // no retries here - retry template instead.
}
}
}
@Component
class Listener {
private final KafkaTemplate<String, String> template;
public Listener(KafkaTemplate<String, String> template) {
this.template = template;
}
@KafkaListener(id = "so60172304.1", topics = "topic")
public void listen1(String in) {
System.out.println("topic: " + in);
if (in.toLowerCase().contains("fail")) {
throw new RuntimeException(in);
}
this.template.send("successTopic", in);
}
@KafkaListener(id = "so60172304.2", topics = "retryTopic", containerFactory = "retryKafkaListenerContainerFactory")
public void listen2(String in) {
System.out.println("retryTopic: " + in);
if (in.startsWith("fail")) {
throw new RuntimeException(in);
}
this.template.send("successTopic", in);
}
@KafkaListener(id = "so60172304.3", topics = "successTopic")
public void listen3(String in) {
System.out.println("successTopic: " + in);
}
@KafkaListener(id = "so60172304.4", topics = "errorTopic")
public void listen4(String in) {
System.out.println("errorTopic: " + in);
}
}
EDIT
To change the payload in the published record, you could use something like this (call MyRepublisher.setNewValue("new value");
).
public class MyRepublisher extends DeadLetterPublishingRecoverer {
private static final ThreadLocal<String> newValue = new ThreadLocal<>();
public MyRepublisher(KafkaTemplate<Object, Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
super(template, destinationResolver);
}
@Override
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, RecordHeaders headers) {
ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(topicPartition.topic(),
topicPartition.partition() < 0 ? null : topicPartition.partition(),
record.key(), newValue.get(), headers);
newValue.remove();
return producerRecord;
}
public static void setNewValue(String value) {
newValue.set(value);
}
}
Upvotes: 2