Reputation: 4004
For the native Java Kafka client, there is a Kafka configuration called, enable.idempotence
and we can set it to be true
to enable idempotence producer.
However, for Spring Kafka, I can't find similar idempotence property in KafkaProperties
class.
So I am wondering, if I manually set in my Spring Kafka configuration file, whether this property will take effect or Spring will totally ignore this config for Spring Kafka?
Upvotes: 6
Views: 14858
Reputation: 1418
You're trying to add features that are not handled by Spring KafkaProperties, if you look at the documentation, you can do as the following:
Only a subset of the properties supported by Kafka are available directly through the KafkaProperties class.
If you wish to configure the producer or consumer with additional properties that are not directly supported, use the following properties:
spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
Yannick
Upvotes: 4
Reputation: 40088
There are two ways to specify this property
application.properties You can use this property to specify any additional properties on producer
spring.kafka.producer.properties.*= # Additional producer-specific properties used to configure the client.
If you have any additional common config between Producer and Consumer
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
Through Code You can also override and customize the configs
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Upvotes: 9
Reputation: 20880
You can find it with ProducerConfig
as it is producer configuration. In order to enable this, you need to add below line in producerConfigs:
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Upvotes: 0