Swastik
Swastik

Reputation: 396

I need help in scaling kafka consumer

We have an E-Commerce-related app for which we are storing product details of some E-Commerce stores in MariaDb. Above MariaDb we have SQLite, which acts as a cache and stores the products that are most popular (20% of products from each store are stored in cache). The reason we used SQLite as a cache instead of a key-value pair-based cache is because we are generating SQL dynamically based on different filters on products, variants, and collections.

Any updates on products (e.g., title, price, etc.) are first updated in MariaDB. To keep our cache (SQLite) in synch with these updates, we are using CDC (Zendesk Maxwell). So whenever there is any update on MariaDb, Maxwell captures these changes and pushes them to Kafka. We have a Spring app that will consume these changes from Kafka and update our cache (SQLite).

Everything has looked good till now, but we have seen problems when there is a bulk import of products or collections from a store into our MariaDB. In this case, there will be millions of upserts in our MariaDB, which leads to millions of CDC messages in Kafka.

In simple terms the rate at which Maxwell is producing CDC messages into Kafka is greater than rate at which my Spring App is consuming them which is leading to huge kafka lag

Since SQLite doesn't support multiple writers to DB we can't have multiple consumers to consume those message.

It will be really helpful if someone help with the following.

  1. Is there anything wrong in the design
  2. Any way to consume those millions records faster, keeping in mind that we have only one consumer
  3. Are there any configuration in kafka consumer to handle these scenarios?

Kafka configuration I used

    ConsumerFactory<String, String> kafkaConsumerFactory(Boolean autoCommit) {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServerAddress);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "25");
    return new DefaultKafkaConsumerFactory<>(props);
}
  

  @Bean(KafkaConstants.LISTENER_CONTAINER_FACTORY)
  public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(kafkaConsumerFactory(Boolean.TRUE));
    factory.setConcurrency(1);
    return factory;
  }

Kafka Consumer

  @KafkaListener(id = "myGroup", topics = {"myTopic"})
  public void productDetailsConsumer(String payload) {
 
     ... process messages and push to SQLite

}

Upvotes: 0

Views: 122

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

From Spring for Apache Kafka point I only see an optimization as a batch listener:

https://docs.spring.io/spring-kafka/reference/html/#batch-listeners

Probably you also can poll much more records than 25 - ConsumerConfig.MAX_POLL_RECORDS_CONFIG.

Upvotes: 1

Related Questions