Reputation: 141
I'm making project using springboot 2, kafk 2.2.0, spring-kafka 2.2.5
I maked kafka exactly once
environment and message producing and consuming was well.
BUT kafka-consumer-groups.sh
saied like this.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test_topic 0 23 24 1
test_topic 1 25 26 1
test_topic 2 21 22 1
I just send only one message to kafka, but LOG-END-OFFSET
doubled up and 1 lag remain always. (In my java app, producing and consuming works as intended)
I don't know why LOG-END-OFFSET doubled up.
If removing exactly once
config, there is no problem in LOG-END-OFFSET
and CURRENT-OFFSET
count.
It is my kafkaTemplate
setup codes.
@Bean
@Primary
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> producerProperties = new HashMap<>();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092";
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// exactly once producer setup
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerProperties, new StringSerializer(), new JsonSerializer<>(KafkaStaticOptions.OBJECT_MAPPER));
factory.setTransactionIdPrefix("my.transaction.");
return factory;
}
@Bean
@Primary
public KafkaTransactionManager<String, Object> kafkaTransactionManager(
ProducerFactory<String, Object> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
@Primary
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
My producer code.
kafkaTemplate.executeInTransaction(kt -> kt.send("test_topic", "test data hahaha"));
I checked when LOG-END-OFFSET doubled up, and it is produce transaction commit
timing.
What did I do wrong configuration?
Upvotes: 9
Views: 3002
Reputation: 26865
When using transactions, Kafka insert "control batches" in the logs to indicate if messages were part of a transaction.
These batches are also assigned offsets so this is why you see the offsets increasing by 2 even though you only sent a single record.
If you want to check for yourself, you can use the DumpLogSegments tool to display the content of your logs and see the control batch:
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/mytopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: false position: 0 CreateTime: 1558083247264 size: 10315 magic: 2 compresscodec: NONE crc: 3531536908 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: true isControl: true position: 10315 CreateTime: 1558083247414 size: 78 magic: 2 compresscodec: NONE crc: 576574952 isvalid: true
I used the transactional producer to send a single record and you can see that the 2nd entry has isControl: true
.
Upvotes: 12