Reputation: 374
I'm thinking of using Kafka Consumer in a thread pool. I came out with this approach. Now It seems working fine but I'm thinking about drawbacks and what problem this approach can bring. Basically what I need is to decouple records processing from consuming. Also, I need to have a strong guarantee that commits happens only after all records are processed. Could someone give a suggestion or advice about how to do this better?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while(true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
Upvotes: 2
Views: 3084
Reputation: 73
I came across the following article which decouples the consumption and processing of records in kafka. You can achieve this by calling poll()
method explicitly and processing records with the help of pause()
and resume()
method.
Processing kafka records in Multi-threaded env
Upvotes: 0
Reputation: 862
This solution is not robust for the stated requirement:
Also, I need to have a strong guarantee that commits happens only after all records are processed
Scenario:
When the consumer is brought up again, the last commit would be corresponding to 105th record. Hence it will start processing for 106th record and we have missed out on successful processing of records 1-100.
You would need to commit only the offsets that you are processing in that poll via:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
Also, the ordering would need to be guaranteed such that first poll is committed first, followed by second and so on. This would be fairly complicated.
I believe that you are trying to achieve concurrency in message processing. This can be achieved with a simpler solution. Increase your max.poll.records to read a decent batch, break it into smaller batches and run them in async to achieve concurrency. Once all batches are done, commit to the kafka consumer.
Upvotes: 2