Reputation: 995
We have a fleet of REST API servers that act as the entry-points to our event-sourced micro-service architecture. In several of these REST endpoints, we need to be able to atomically write to several Kafka topics. Our understanding is that Kafka transactions are the correct (and only) way to do this.
Our understanding of the transaction semantics are as follows:
transactional.id
. This identifier acts as Kafka's internal token for this producer's transaction context.transactional.id
must always be occupied. If this were not the case, a consumer in READ_COMMITED
mode would block on a topic if a transaction were started by a producer which later died, and whose transactional.id
was not picked up by another node.transactional.id
that is currently in use, Kafka increments that transactional.id
's epoch number, fences out any other producers that may be using that id and closes any open transactions with stale epoch numbers.Most of the publicly available resources about transactional Kafka address the use-case of stream computation for commit logs which is not our use-case (we use kafka-streams
for this). We specifically care about safely publishing data from outside Kafka.
Our Environment:
Thus our questions are:
transactional.id
s correctly in this context? Because of the nature of auto-scaling, we cannot use our node-number as a factor in the transactional.id
because as soon as we reach a new high-water mark of nodes, any time we scale down, we risk having un-allocated transactional.id
s floating around.Upvotes: 0
Views: 982
Reputation: 36
I think you can still use a unique node id for your transactional.ids, and your use case seems appropriate for Kafka transactions.
If I'm understanding your question correctly, you're concerned that a Producer with a given transactional.id may unexpectedly fail and leave an open transaction lying around that will block consumers operating in read_committed mode. Normally, you might expect the Producer to come back to life an re-register its transactional.id, which would cause any open transactions to abort, but in your case, the Producer may simply never come back, due to a scale down in the number of your API nodes.
There are several configuration values that help with this situation. The main one is the producer config:
transaction.timeout.ms
which is the maximum time the transaction coordinator will wait for an update from a producer before aborting a transaction. The current default for that is 60000 ms, but you may wish to reduce it if it makes sense in your situation. After the transaction is aborted, consumers should become unblocked.
You may also be interested in the broker level configs:
transactional.id.timeout.ms
and transaction.abort.timed.out.transaction.cleanup.interval.ms
See the Kafka doc for descriptions of these: https://kafka.apache.org/documentation/
Or read the original KIP for a bit more detail: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
Upvotes: 2
Reputation: 1408
From my understanding, your pipeline looks like this :
Rest API -> Kafka producer (write atomically)-> Kafka cluster
In order to write atomically in Kafka, using a KafkaProducer, you can enable producer idempotence ( that will use a cache Kafka side to avoid duplicates), and of course, use acks=all parameters ( this will ensure your records are written to at least some ISR).
Here some doc for idempotence producer :
Kafka transactions are useful when doing Read Proccess Write inside a same Kafka cluster like the following :
Kafka cluster --> read -- > KafkaConsumer -- Transform --> KafkaProducer --> same Kafka cluster
If I'm correct, you don't need to use Kafka transactions, thus don't need to worry about transactionnal.id
Yannick
Upvotes: 0