shj
shj

Reputation: 1678

Spring Boot Kafka Configure DefaultErrorHandler?

I created a batch-consumer following the Spring Kafka docs:

@SpringBootApplication
public class ApplicationConsumer {
  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationConsumer.class);
  private static final String TOPIC = "foo";

  public static void main(String[] args) {
    ConfigurableApplicationContext context = SpringApplication.run(ApplicationConsumer.class, args);
  }

  @Bean
  public RecordMessageConverter converter() {
    return new JsonMessageConverter();
  }

  @Bean
  public BatchMessagingMessageConverter batchConverter() {
    return new BatchMessagingMessageConverter(converter());
  }

  @KafkaListener(topics = TOPIC)
  public void listen(List<Name> ps) {
    LOGGER.info("received name beans: {}", Arrays.toString(ps.toArray()));
  }
}

I was able to successfully get the consumer running by defining the following additional configuration env variables, that Spring automatically picks up:

export SPRING_KAFKA_BOOTSTRAP-SERVERS=...
export SPRING_KAFKA_CONSUMER_GROUP-ID=...

So the above code works. But now I want to customize the default error handler to use exponential backoff. From the ref docs I tried adding the following to ApplicationConsumer class:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setCommonErrorHandler(new DefaultErrorHandler(new ExponentialBackOffWithMaxRetries(10)));
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    return props;
}

But now I get errors saying that it can't find some of the configuration. It looks like I'm stuck having to redefine all of the properties in consumerConfigs() that were already being automatically defined before. This includes everything from bootstrap server uris to the json-deserialization config.

Is there a good way to update my first version of the code to just override the default-error handler?

Upvotes: 5

Views: 13858

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

Just define the error handler as a @Bean and Boot will automatically wire it into its auto configured container factory.

EDIT

This works as expected for me:

@SpringBootApplication
public class So70884203Application {

    public static void main(String[] args) {
        SpringApplication.run(So70884203Application.class, args);
    }

    @Bean
    DefaultErrorHandler eh() {
        return new DefaultErrorHandler((rec, ex) -> {
            System.out.println("Recovered: " + rec);
        }, new FixedBackOff(0L, 0L));
    }

    @KafkaListener(id = "so70884203", topics = "so70884203")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    NewTopic topic() {
        return TopicBuilder.name("so70884203").partitions(1).replicas(1).build();
    }

}
foo
Recovered: ConsumerRecord(topic = so70884203, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1643316625291, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)

Upvotes: 8

Related Questions