Reputation: 1078
I am using spring boot 2.1.9 with spring Kafka 2.2.9
I am getting some warning in logs file which says commit failed and also i am using SeekToCurrentErrorHandler to capture the error once retry exausted , but sometimes if commits failed its keeps on iterating.
here is my config class
public class KafkaReceiverConfig {
// Kafka Server Configuration
private String kafkaServers;
// Group Identifier
private String groupId;
// Kafka Max Retry Attempts
private Integer retryMaxAttempts;
// Kafka Max Retry Interval
private Long retryInterval;
// Kafka Concurrency
private Integer concurrency;
// Kafka Concurrency
private Integer pollTimeout;
// Kafka Consumer Offset
private String offset = "earliest";
// Logger
private static final Logger log = LoggerFactory.getLogger(KafkaReceiverConfig.class);
* Defines the Max Number of Retry Attempts
* @return Return the Retry Policy @see {@link RetryPolicy}
public RetryPolicy retryPolicy() {
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
return simpleRetryPolicy;
* Time before the next Retry can happen, the Time used is in Milliseconds
* @return Return the BackOff Policy @see {@link BackOffPolicy}
public BackOffPolicy backOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
return backOffPolicy;
* Get Retry Template
* @return Return the Retry Template @see {@link RetryTemplate}
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
return retryTemplate;
* String Kafka Listener Container Factor
* @return @see {@link KafkaListenerContainerFactory}
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(
ChainedKafkaTransactionManager<String, String> chainedTM, MessageProducer messageProducer) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
// NOTE: retryMaxAttempts should always +1 due to spring kafka bug
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record, exception) -> {
log.warn("failed to process kafka message (retries are exausted). topic name:"+record.topic()+" value:"+record.value());
messageProducer.saveFailedMessage(record, exception);
}, retryMaxAttempts + 1);
log.debug("Kafka Receiver Config kafkaListenerContainerFactory created");
return factory;
* String Consumer Factory
* @return @see {@link ConsumerFactory}
public ConsumerFactory<String, String> consumerFactory() {
log.debug("Kafka Receiver Config consumerFactory created");
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
* Consumer Configurations
* @return @see {@link Map}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new ConcurrentHashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Disable the Auto Commit if required for testing
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
log.debug("Kafka Receiver Config consumerConfigs created");
return props;
here is log :
2019-10-30 15:48:05.907 WARN [xxxxx-component-workflow-starter,,,] 11 --- [nt_create-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=fulfillment_create] Synchronous auto-commit of offsets {fulfillment_create-4=OffsetAndMetadata{offset=32, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Upvotes: 0
Views: 947
Reputation: 174689
You are taking too long to process the records returned by the poll().
You need to reduce max.poll.records
) and/or increase
You can't perform a seek after this error - you have lost the partitions.
Upvotes: 1