Reputation: 879
I am trying to create a kafka consumer which subscribe to a topic and poll regular interval. Below @Bean creates kafkaconsumer which is used in @postconstruct by executor service. In other use case I need to read the latest offset or replay from specific offset, while i try to create a new kafka cosumer it is throwing javax.management.InstanceAlreadyExistsException: kafka.consumer
Is there any to use the existing kafkaConsumer bean to get the latest offset or replay some of the offset.
@Configuration
@Slf4j
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerClientConfig {
@Bean
public KafkaConsumer<String, String> kafkaClientConfig() {
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(MetadataServiceUtil.getConsumerProperties(kafkaProperties));
log.info("Kafka Consumer created successfully with SecurityProtocol={}", kafkaProperties.getSecurityProtocol());
kafkaConsumer.subscribe(List.of(kafkaProperties.getTopicName()));
return kafkaConsumer;
}
}
Kafka client code to create kafka consumer
@Component
@Slf4j
@Data
public class PipelineKafkaClient {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProperties kafkaProperties;
@PostConstruct
void startup() {
log.info("*******Starting event listener********");
executorService = Executors.newSingleThreadExecutor();
executorService.submit(new EventConsumer(kafkaConsumer));
// In the EventConsumer i have the code as below
/* while(true) {
* ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
* processBatch(records);
* kafkaConsumer.commitSync();
* }
*/
}
public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
log.info("Replaying messages");
Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// CODE to read from specific offset
}
}@Component
@Slf4j
@Data
public class PipelineKafkaClient {
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private KafkaProperties kafkaProperties;
@PostConstruct
void startup() {
log.info("*******Starting event listener********");
executorService = Executors.newSingleThreadExecutor();
executorService.submit(new EventConsumer(kafkaConsumer));
// In the EventConsumer i have the code as below
/* while(true) {
* ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(500));
* processBatch(records);
* kafkaConsumer.commitSync();
* }
*/
}
public void replayFromSpecificOffset(long offsetToReadFrom, long offsetReadTo) {
log.info("Replaying messages");
Properties properties = MetadataServiceUtil.getConsumerProperties(kafkaProperties);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// CODE to read from specific offset
}
}
Upvotes: 0
Views: 827
Reputation: 41
I think you are trying to create new instance of KC in your function but that bean is already registered in Spring configuration class KafkaConsumerClientConfig. Solution: try to re use the existing KC instance as a bean in config class instead of new one. Inject it in a class where you want to use it. Here is a hint KafkaConsumer<String, String> consumer = kafkaConsumer instead of initiating a new one.
Upvotes: 0