Kiran
Kiran

Reputation: 879

Kafka Consumer InstanceAlreadyExistsException

I am trying to create a kafka consumer which subscribe to a topic and poll regular interval. Below @Bean creates kafkaconsumer which is used in @postconstruct by executor service. In other use case I need to read the latest offset or replay from specific offset, while i try to create a new kafka cosumer it is throwing javax.management.InstanceAlreadyExistsException: kafka.consumer

Is there any to use the existing kafkaConsumer bean to get the latest offset or replay some of the offset.

@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerClientConfig {
    @Bean
    public KafkaConsumer<String, String> kafkaClientConfig() {
    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(MetadataServiceUtil.getConsumerProperties(kafkaProperties));
    log.info("Kafka Consumer created successfully with SecurityProtocol={}", kafkaProperties.getSecurityProtocol());
    kafkaConsumer.subscribe(List.of(kafkaProperties.getTopicName()));
    return kafkaConsumer;
    }
}

Kafka client code to create kafka consumer

@Component
@Slf4j
@Data
public class PipelineKafkaClient {

    @Autowired
    private KafkaConsumer kafkaConsumer;
  
    @Autowired
    private KafkaProperties kafkaProperties;

    @PostConstruct
    void startup() {
        log.info("*******Starting event listener********");
        executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new EventConsumer(kafkaConsumer));
        // In the EventConsumer i have the code as below
        /*  while(true) {
         *   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
         *   processBatch(records);
         *   kafkaConsumer.commitSync();
         * }
         */

    }

    public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
        log.info("Replaying messages");
        Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // CODE to read from specific offset    
    }

}@Component
@Slf4j
@Data
public class PipelineKafkaClient {

    @Autowired
    private KafkaConsumer kafkaConsumer;
  
    @Autowired
    private KafkaProperties kafkaProperties;

    @PostConstruct
    void startup() {
        log.info("*******Starting event listener********");
        executorService = Executors.newSingleThreadExecutor();
        executorService.submit(new EventConsumer(kafkaConsumer));
        // In the EventConsumer i have the code as below
        /*  while(true) {
         *   ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
         *   processBatch(records);
         *   kafkaConsumer.commitSync();
         * }
         */

    }

    public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
        log.info("Replaying messages");
        Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // CODE to read from specific offset    
    }

}

Upvotes: 0

Views: 827

Answers (1)

qalokoz
qalokoz

Reputation: 41

I think you are trying to create new instance of KC in your function but that bean is already registered in Spring configuration class KafkaConsumerClientConfig. Solution: try to re use the existing KC instance as a bean in config class instead of new one. Inject it in a class where you want to use it. Here is a hint KafkaConsumer<String, String> consumer = kafkaConsumer instead of initiating a new one.

Upvotes: 0

Related Questions