Reputation: 396
We have an E-Commerce-related app for which we are storing product details of some E-Commerce stores in MariaDb. Above MariaDb we have SQLite, which acts as a cache and stores the products that are most popular (20% of products from each store are stored in cache). The reason we used SQLite as a cache instead of a key-value pair-based cache is because we are generating SQL dynamically based on different filters on products, variants, and collections.
Any updates on products (e.g., title, price, etc.) are first updated in MariaDB. To keep our cache (SQLite) in synch with these updates, we are using CDC (Zendesk Maxwell). So whenever there is any update on MariaDb, Maxwell captures these changes and pushes them to Kafka. We have a Spring app that will consume these changes from Kafka and update our cache (SQLite).
Everything has looked good till now, but we have seen problems when there is a bulk import of products or collections from a store into our MariaDB. In this case, there will be millions of upserts in our MariaDB, which leads to millions of CDC messages in Kafka.
In simple terms the rate at which Maxwell is producing CDC messages into Kafka is greater than rate at which my Spring App is consuming them which is leading to huge kafka lag
Since SQLite doesn't support multiple writers to DB we can't have multiple consumers to consume those message.
It will be really helpful if someone help with the following.
Kafka configuration I used
ConsumerFactory<String, String> kafkaConsumerFactory(Boolean autoCommit) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServerAddress);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "25");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean(KafkaConstants.LISTENER_CONTAINER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory(Boolean.TRUE));
factory.setConcurrency(1);
return factory;
}
Kafka Consumer
@KafkaListener(id = "myGroup", topics = {"myTopic"})
public void productDetailsConsumer(String payload) {
... process messages and push to SQLite
}
Upvotes: 0
Views: 122
Reputation: 121542
From Spring for Apache Kafka point I only see an optimization as a batch listener:
https://docs.spring.io/spring-kafka/reference/html/#batch-listeners
Probably you also can poll much more records than 25
- ConsumerConfig.MAX_POLL_RECORDS_CONFIG
.
Upvotes: 1