Reputation: 654
I could not find in the documentation how to set the retention time when creating a producer using confluent-kafka.
If I just specify 'bootstrap-servers' the default retention time is 1 day. I would like to be able to change that.
(I want to do this in the python API not on the command line.)
Upvotes: 2
Views: 3925
Reputation: 191710
Retention time is set when you create a topic, not on the producer configuration.
If your server.properties
allows for auto-topic creation, then you will get the defaults set in there.
Otherwise, you can use the AdminClient API to send a NewTopic
request which supports a config
attribute of dict<str,str>
from confluent_kafka.admin import AdminClient, NewTopic
# a = AdminClient(...)
topics = list()
t = NewTopic(topic, num_partitions=3, replication_factor=1, config={'log.retention.hours': '168'})
topics.append(t)
# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = a.create_topics(topics)
# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
In the same link, you can find an alter topic request
Upvotes: 4
Reputation: 201
the retention time is not a property of the producer. The default retention time is set in broker configfile server.properties and properties like log.retention.hours, e.g. /etc/kafka/server.properties ...depending on your installation.
You can alter the retention time on a per topic base via e.g.
$ <path-to-kafka>/bin/kafka-topics.sh --zookeeper <zookeeper-quorum> --alter --topic <topic-name> --config retention.ms=<your-desired-retention-in-ms>
HTH....
Upvotes: 0