R K
R K

Reputation: 382

How can we enable log compaction using spring cloud stream?

How to set log.cleanup.policy: compact using spring configuration

Upvotes: 1

Views: 1779

Answers (3)

sobychacko
sobychacko

Reputation: 5924

Updating this answer based on Gary's and Artem's answers below to avoid any confusion.

You can pass any arbitrary kafka client configuration using the key spring.cloud.stream.kafka.binder.configuration.... However, since log.cleanup.policy is a broker level property, you cannot use it this way from the binder. You need to set it on the broker. Please see the answers below for more information.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174769

log.cleanup.policy is a broker configuration (in server.properties), not a client property.

To change the policy for a single topic

kafka-topics --zookeeper localhost:2181 --alter --topic myTopic --config cleanup.policy=compact

or

kafka-configs --zookeeper localhost:2181 --entity-type=topics --entity-name=mytopic --alter --add-config cleanup.policy=compact

(since the first one is deprecated)

WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
     Going forward, please use kafka-configs.sh for this functionality

Upvotes: 2

Artem Bilan
Artem Bilan

Reputation: 121550

This property is a Broker Config: http://kafka.apache.org/documentation/#brokerconfigs. Therefore has to be configured on the broker side. There is nothing to do from the Spring Cloud Stream Kafka Binder perspective. It is just a client to the existing Apache Kafka broker.

If you talk about KafkaEmbedded from the Spring Kafka perspective, there are these options:

/**
 * Specify the properties to configure Kafka Broker before start, e.g.
 * {@code auto.create.topics.enable}, {@code transaction.state.log.replication.factor} etc.
 * @param brokerProperties the properties to use for configuring Kafka Broker(s).
 * @return this for chaining configuration
 * @see KafkaConfig
 */
public KafkaEmbedded brokerProperties(Map<String, String> brokerProperties) {
    this.brokerProperties.putAll(brokerProperties);
    return this;
}

/**
 * Specify a broker property.
 * @param property the property name.
 * @param value the value.
 * @return the {@link KafkaEmbedded}.
 * @since 2.1.4
 */
public KafkaEmbedded brokerProperty(String property, Object value) {
    this.brokerProperties.put(property, value);
    return this;
}

Upvotes: 2

Related Questions