Reputation: 148
I'm trying to configure exactly once semantics in Kafka (Apache Beam). Here are the changes what I'm going to introduce:
Producer:
enable.idenpotence
= true
transactional.id
= uniqueTransactionalId
Consumer:
set enable.auto.commit
= false
// added the following to consumer builder:
.commitOffsetsInFinalize()
.withReadCommitted()
Added the following to KafkaIO#write
builder:
.withEOS(numShards, sinkGroupId)
Does anyone know what else should be changed to achieve exactly once semantics in Apache Beam KafkaIO?
Does the configuration above looks fine or I misunderstood smth?
Do I need to specify the transactional.id
property if I do not use transaction API (because I do not have explicit producer in apache beam)?
Upvotes: 3
Views: 847
Reputation: 148
Well, looks like I finally found out the proper settings that fit my requirements. Here is what I ended up with:
1) KafkaIO.Read
:
enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()
2) KafkaIO#write
:
.withEOS(numShards, sinkGroupId)
It will also enable idempotence and set the transactional.id
under the hood for the producer.
So as the result, with such settings we will have at-least-once semantics on read, and exactly-once semantics on write.
Upvotes: 4