Mahesha999
Mahesha999

Reputation: 24731

When to use Kafka transactional API?

I was trying to understand Kafka's transactional API. This link defines atomic read-process-write cycle as follows:

First, let’s consider what an atomic read-process-write cycle means. In a nutshell, it means that if an application consumes a message A at offset X of some topic-partition tp0, and writes message B to topic-partition tp1 after doing some processing on message A such that B = F(A), then the read-process-write cycle is atomic only if messages A and B are considered successfully consumed and published together, or not at all.

It further says says following:

Using vanilla Kafka producers and consumers configured for at-least-once delivery semantics, a stream processing application could lose exactly once processing semantics in the following ways:

  1. The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

  2. We may reprocess the input message A, resulting in duplicate B messages being written to the output, violating the exactly once processing semantics. Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

  3. Finally, in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost. Through this process, we may have multiple instances processing the same input topics and writing to the same output topics, causing duplicate outputs and violating the exactly once processing semantics. We call this the problem of “zombie instances.”

We designed transaction APIs in Kafka to solve the second and third problems. Transactions enable exactly-once processing in read-process-write cycles by making these cycles atomic and by facilitating zombie fencing.

Doubts:

  1. Points 2 and 3 above describe when message duplication can occur which are dealt with using transactional API. Does transactional API also help to avoid message loss in any scenario?

  2. Most online (for example, here and here) examples of Kafka transactional API involve:

    while (true)
    {
        ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
        producer.beginTransaction();
        for (ConsumerRecord record : records)
          producer.send(producerRecord(“outputTopic”, record));
        producer.sendOffsetsToTransaction(currentOffsets(consumer), group);  
        producer.commitTransaction();
    }
    

    This is basically read-process-write loop. So does transactional API useful only in read-process-write loop?

  3. This article gives example of transactional API in non read-process-write scenario:

     producer.initTransactions();
     try {
        producer.beginTransaction();
        producer.send(record1);
        producer.send(record2);
        producer.commitTransaction();
     } catch(ProducerFencedException e) {
       producer.close();
     } catch(KafkaException e) {
       producer.abortTransaction();
     } 
    

    It says:

    This allows a producer to send a batch of messages to multiple partitions such that either all messages in the batch are eventually visible to any consumer or none are ever visible to consumers.

    Is this example correct and shows another way to use transactional API different from read-process-write loop? (Note that it also does not commit offset to transaction.)

  4. In my application, I simply consume messages from kafka, do processing and log them to the database. That is my whole pipeline.

    a. So, I guess this is not read-process-write cycle. Is Kafka transactional API of any use to my scenario?

    b. Also I need to ensure that each message is processed exactly once. I guess setting idempotent=true in producer will suffice and I dont need transactional API, right?

    c. I may run multiple instances of pipeline, but I am not writing processing output to Kafka. So I guess this will never involve zombies (duplicate producers writing to kafka). So, I guess transactional API wont help me to avoid duplicate processing scenario, right? (I might have to persist both offset along with processing output to the database in the same database transaction and read the offset during producer restart to avoid duplicate processing.)

Upvotes: 4

Views: 4653

Answers (1)

Radu Stefan Popescu
Radu Stefan Popescu

Reputation: 197

a. So, I guess this is not read-process-write cycle. Is Kafka transactional API of any use to my scenario?

It is a read-process-write, except you are writing to a database instead of Kafka. Kafka has its own transaction manager and thus writing inside a transaction with idempotency would enable exactly once processing, assuming you can resume the state of your consumer-write processor correctly. You cannot do that with a DB because the DB's transaction manager doesn't sync with Kafka's. What you can do instead is make sure that even if kafka transactions are not atomic with respect to your database, they are still eventually consistent.

Let's assume your consumer reads, writes to the DB and then acks. If the DB fails you don't ack and you can resume normally based on the offset. If the ack fails you will process twice and save to the DB twice. If you can make this operation idempotent, then you are safe. This means that your processor must be pure and the DB has to dedupe: processing the same message twice should always lead to the same result on the DB.

b. Also I need to ensure that each message is processed exactly once. I guess setting idempotent=true in producer will suffice and I dont need transactional API, right?

Assuming that you respect the requirements from point a, exactly once processing with persistence on a different store also requires that between your initial write and the duplicate no other change has happened to the objects that you are saving. Imagine having a value written as X, then some other actor changes it to Y, then the message is reprocessed and changes it back to X. This can be avoided for example, by making your database table be a log, similar to a kafka topic.

c. I may run multiple instances of pipeline, but I am not writing processing output to Kafka. So I guess this will never involve zombies (duplicate producers writing to kafka). So, I guess transactional API wont help me to avoid duplicate processing scenario, right? (I might have to persist both offset along with processing output to the database in the same database transaction and read the offset during producer restart to avoid duplicate processing.)

It is the producer which writes to the topic you consume from that may create zombie messages. That producer needs to play nice with kafka so that zombies are ignored. The transactional API together with your consumer will make sure that this producer writes atomically and your consumer reads committed messages, albeit not atomically. If you want exactly once idempotency is enough. If the messages are supposed to be atomically written you need transactions too. Either way your read-write/consume-produce processor needs to be pure and you have to dedupe. Your DB is also part of this processor since the DB is the one that actually persists.

I've looked for a bit on the internet, maybe this link helps you: processing guarantees

The links you posted: exactly once semantics and transactions in kafka are great.

Upvotes: 2

Related Questions