tryx
tryx

Reputation: 995

How to select a transactional.id for a REST API that transactionally publishes to Kafka?

We have a fleet of REST API servers that act as the entry-points to our event-sourced micro-service architecture. In several of these REST endpoints, we need to be able to atomically write to several Kafka topics. Our understanding is that Kafka transactions are the correct (and only) way to do this.

Our understanding of the transaction semantics are as follows:

Most of the publicly available resources about transactional Kafka address the use-case of stream computation for commit logs which is not our use-case (we use kafka-streams for this). We specifically care about safely publishing data from outside Kafka.

Our Environment:

Thus our questions are:

Upvotes: 0

Views: 982

Answers (2)

Michael Carter
Michael Carter

Reputation: 36

I think you can still use a unique node id for your transactional.ids, and your use case seems appropriate for Kafka transactions.

If I'm understanding your question correctly, you're concerned that a Producer with a given transactional.id may unexpectedly fail and leave an open transaction lying around that will block consumers operating in read_committed mode. Normally, you might expect the Producer to come back to life an re-register its transactional.id, which would cause any open transactions to abort, but in your case, the Producer may simply never come back, due to a scale down in the number of your API nodes.

There are several configuration values that help with this situation. The main one is the producer config:

transaction.timeout.ms

which is the maximum time the transaction coordinator will wait for an update from a producer before aborting a transaction. The current default for that is 60000 ms, but you may wish to reduce it if it makes sense in your situation. After the transaction is aborted, consumers should become unblocked.

You may also be interested in the broker level configs: transactional.id.timeout.ms and transaction.abort.timed.out.transaction.cleanup.interval.ms

See the Kafka doc for descriptions of these: https://kafka.apache.org/documentation/

Or read the original KIP for a bit more detail: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

Upvotes: 2

Yannick
Yannick

Reputation: 1408

From my understanding, your pipeline looks like this :

Rest API -> Kafka producer (write atomically)-> Kafka cluster

In order to write atomically in Kafka, using a KafkaProducer, you can enable producer idempotence ( that will use a cache Kafka side to avoid duplicates), and of course, use acks=all parameters ( this will ensure your records are written to at least some ISR).

Here some doc for idempotence producer :

https://www.cloudkarafka.com/blog/2019-04-10-apache-kafka-idempotent-producer-avoiding-message-duplication.html

Kafka transactions are useful when doing Read Proccess Write inside a same Kafka cluster like the following :

Kafka cluster --> read -- > KafkaConsumer -- Transform --> KafkaProducer --> same Kafka cluster

If I'm correct, you don't need to use Kafka transactions, thus don't need to worry about transactionnal.id

Yannick

Upvotes: 0

Related Questions