VR1256
VR1256

Reputation: 1450

Kafka consumer design to process huge volume of data with multi instance

I am trying to design Kafka consumers, and I have a road block on how to design the process. I am thinking of two options:

1.  Process records directly from Kafka.
2.  Staging table write from Kafka and process records.

Approach 1: Process Key messages on the go from Kafka:

•   Read messages one at a time from Kafka & if no records to process break the loop (configurable messages to process)
•   Execute business rules.
•   Apply changes to consumer database.
•   Update Kafka offset to read after processing message.
•   Insert into staging table (used for PD guide later on)

Questions with above approach:

•   Is it OK to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed
    and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way ?
•   Is it OK to manually commit the offset to Kafka? (Performance issues with manual offset commit).

Approach 2: Staging table write from Kafka and process records

Process 1: Consuming events from Kafka and put in staging table.
Process 2: Reading staging table (configurable rows), execute business rules, apply consumer database changes 
& update the status of processed records in staging table. (we may have multiple process to do this step)

I see a lot of downside on this approach:

•   We are missing the advantage of offset handling provided by Kafka and we are doing manual update of processed records in staging table.
•   Locking & Blocking on staging tables for multi instance, as we are trying to insert & do updates after processing in the same staging table 
    (note: I can design separate tables and move this data there and process them but that could is introducing multiple processes again.

How can I design Kafka with multi instance consumer and huge data to process, which design is appropriate, is it good to read data on the go from Kafka and process the messages or stage it to a table and write another job to process these messages ?

Upvotes: 2

Views: 2461

Answers (3)

Mukul Bansal
Mukul Bansal

Reputation: 936

This is how I think we can get the best throughput without worrying about the loss of messages-

  1. Maximize the number of partitions.
  2. Deploy the consumers(at max the number of partitions, even less if your consumers can operate multi-threaded without any problem.)
  3. Read single-threadedly from within each consumer(with auto offset commit) and put the messages in a Blocking Queue which you can control based upon the number of actual processing threads in each consumer.
  4. If the processing fails, you can retry for success or else put messages in a dead-letter queue. Don't forget the implementation of shut down hookups for processing already consumed messages.
  5. If you want to ensure ordering like processing events with the same key one after the another or on any other factor from a single partition, you can use a deterministic executor. I have written a basic ExecutorService in Java that can execute multiple messages in a deterministic way without compromising on the multi-threading of logically separate events. Link- https://github.com/mukulbansal93/deterministic-threading

To answer your questions-

  1. Is it ok to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way? I don't see much performance issues here as you are processing in bulk. However, it is possible that one of your consumed messages is taking a long time while others get processes. In that case, you will not read other messages from Kafka leading to a performance bottleneck.
  2. Is it ok to manually commit the offset to Kafka? (Performance issues with manual offset commit). This is definitely going to be the least throughput approach as offset committing is an expensive operation.

Upvotes: 1

glitch99
glitch99

Reputation: 304

The design of your consumer depends on your usecase. If there are other downstream processes that is expecting the same data and has a limitation to connect to your kafka cluster. In this case having a staging table is a good idea.

I think in your case approach 1 with a little alteration is a good way to go. However you dont need to break the loop if there are no new messages in the topic. Also, theres a consumer property that helps to configure the number of records that you want to poll from kafka in a single request (default 500) you might want to change it to a lower number if each message takes a long time to process (To avoid timeout or unwanted repartitioning issues).

Since you mentioned the amount of data is huge I would recommend having more partitions for concurrency if processing order doesnot matter for you. Concurrency can be achieved my creating a consumer group with instance count no more than the number of partitions for the topic. (If the consumer instance count is more than the number of partitions the extra instances will be ideal)

If order does matter, The producer should ideally send logically grouped messages with the same message key so that all messages with the same key land in the same partition.

About offset commiting, if you sync commit each message to kafka you will definitely have performance impact. Usually in offset is commited for each consumed batch of record. eg poll 500 records-> process -> commit the batch of records. However, If you need to send out a commit for each message you might want to opt for Async Commit.

Additionally, when partitions are assigned to a consumer group instance it doesnot lock the partitions. Other consumer groups can subscribe to the same topic and consume messages concurrently.

Upvotes: 2

Boaz
Boaz

Reputation: 1470

The first approach where you consume the data and update a table accordingly sounds like the right way.

Kafka guarantees

At least once: you may get the same message twice.
that means that you need the messages to be idempotent -> set amount to x and not add an amount to the previous value.

order (per partition): Kafka promise that you consume messages in the same order the messages were produced - per partition. Like a queue per partition.
if when you say "Execute business rules" you need to also read previous writes, that means you need to process them one by one.

How to define the partitions

If you define one partition you won't have a problem with conflicts but you will only have one consumer and that doesn't scale.
if you arbitrarily define multiple partitions then you may lose the order.
why is that a problem?
you need to define the partitions according to your business model: For example, let's say that every message updates some user's DB. when you process a message you want to read the user row, check some fields, and then update (or not) according to that field.
that means that if you define the partition by user-id -> (user-id % number of partitions)
you guarantee that you won't have a race condition between two updates on the same user and you can scale to multiple machines/processes/threads. each consumer in-charge of some set of users but it's always the same users.

Upvotes: 1

Related Questions