Illia
Illia

Reputation: 374

Usage of Java Kafka Consumer in multiple threads

I'm thinking of using Kafka Consumer in a thread pool. I came out with this approach. Now It seems working fine but I'm thinking about drawbacks and what problem this approach can bring. Basically what I need is to decouple records processing from consuming. Also, I need to have a strong guarantee that commits happens only after all records are processed. Could someone give a suggestion or advice about how to do this better?

  final var consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(topics);
    final var threadPool = Executors.newFixedThreadPool(32);

    while(true) {

        ConsumerRecords<String, String> records;

        synchronized (consumer) {
            records = consumer.poll(Duration.ofMillis(100));
        }

        CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
            synchronized (consumer) {
                consumer.commitSync();
            }
        });
    }

Upvotes: 2

Views: 3084

Answers (2)

user3398321
user3398321

Reputation: 73

I came across the following article which decouples the consumption and processing of records in kafka. You can achieve this by calling poll() method explicitly and processing records with the help of pause() and resume() method.

Processing kafka records in Multi-threaded env

Upvotes: 0

Rishabh Sharma
Rishabh Sharma

Reputation: 862

Issue

This solution is not robust for the stated requirement:

Also, I need to have a strong guarantee that commits happens only after all records are processed

Scenario:

  1. Poll reads 100 records, starts processing in async
  2. Poll reads 5 records, starts processing in async
  3. Processing of 5 records happens immediately and consumer commit is done while processing of 100 records is still in progress
  4. Consumer crashes

When the consumer is brought up again, the last commit would be corresponding to 105th record. Hence it will start processing for 106th record and we have missed out on successful processing of records 1-100.

You would need to commit only the offsets that you are processing in that poll via:

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

Also, the ordering would need to be guaranteed such that first poll is committed first, followed by second and so on. This would be fairly complicated.

Proposition

I believe that you are trying to achieve concurrency in message processing. This can be achieved with a simpler solution. Increase your max.poll.records to read a decent batch, break it into smaller batches and run them in async to achieve concurrency. Once all batches are done, commit to the kafka consumer.

Upvotes: 2

Related Questions