Reputation: 11
We have one producer-consumer environment, we are using Spring Boot for our project. Kafka configuration was done by using class
@Configuration
@EnableKafka
public class DefaultKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.bootstrap-servers-group}")
private String bootstrapServersGroup;
@Bean
public ConsumerFactory<String,String> consumerDefaultFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, bootstrapServersGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerDefaultContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerDefaultFactory());
return factory;
}
}
SCENARIO : We are writing some values on Kafka topics. Consider we have some topic where we are putting live data. Which have status like "live:0" for completed event and "live:1" for live event. Now when event going to be live it will get update and write on topic, and depending on this topic we are processing event.
ISSUE : When event get live I read data from topic with "live:1" and processed. But when event got updated and new data updated in topic. Here now when new data updated on topic I am able to read those data. But with new data on topic, I am receiving old data too. Because I am getting both old and new data same time my event got affected. Some time it goes live some time in completed.
Anyone give any suggestions here on this? Why I am getting committed data and newly updated data? Any thing I am missing here in configuration?
Upvotes: 1
Views: 578
Reputation: 11
try {
ListenableFuture<SendResult<String, String>> futureResult = this.kafkaTemplate.send(topicName, message);
futureResult.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Message successfully sent to topic {} with offset {} ", result.getRecordMetadata().topic(), result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
FAILMESSAGELOGGER.info("{},{}", topicName, message);
log.info("Unable to send Message to topic {} due to ", topicName, ex);
}
});
} catch (Exception e) {
log.error("Outer Exception occured while sending message {} to topic {}", new Object[] { message, topicName, e });
FAILMESSAGELOGGER.info("{},{}", topicName, message);
}
This what we have.
Upvotes: 0
Reputation: 504
you may want to check the couple of things: -1. number of partitions 2. number of consumer
does it also means that you are re-writing the consume message to topic again, with new status?
Upvotes: 1