Reputation: 923
I have a simple kafka consumer listener in a springboot application, like this:
public void receive(String message) {"received message='{}'", messge);
in some particular cases I would like to reject the message, but I would like the system to propose it to me again after a certain time;
how can I do?
Note: I would also like the kafka configuration to be taken custom-made (not default springboot structure)
Upvotes: 1
Views: 2779
Reputation: 825
my implementation does just the thing you need:
1) kafka configuration class that takes the fields from the custom property and retry the rejected messages after 5000 milliseconds (inside the kafkaListenerContainerFactory method):
public class KafkaConfig {
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
if(enableSsl) {
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslPassword);
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslPassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, sslPassword);
return props;
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
ContainerProperties containerProperties = factory.getContainerProperties();
RetryTemplate retryTemplate = new RetryTemplate();
//infinite number of retry attempts
retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
//wait a "waitingTime" time before retrying
int waitingTime = 5000;
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
//or use exponential waiting
//ExponentialBackOffPolicy expBackoff = new ExponentialBackOffPolicy();
return factory;
2) class that consumes messages:
public class Consumer {
private final Logger logger = LoggerFactory.getLogger(Consumer.class);
public void consume(String message, Acknowledgment ack) throws IOException {
if(processMessage) {"##KAFKA## -> Consumed message -> %s", message));
else {
logger.error(String.format("##KAFKA## -> Failed message -> %s", message));
throw new IOException("reject message");
Upvotes: 1
Reputation: 174554
See retrying deliveries and Stateful Retry.
Configure the listener factory with your desired retry characteristics and (optionally) add a SeekToCurrentErrorHandler
Upvotes: 1