Reputation: 17276
I am trying to create a Python script to automate topic creation and configuration.
I have managed to make the topic creation part work. I get a runtime error when trying to alter the configuration.
ValueError: expected non-empty list of ConfigEntry to alter incrementally in incremental_configs field
Here's my code:
config = {
'min.insync.replicas': '3'
}
resource = ConfigResource(ResourceType.TOPIC, name=topic_name, set_config=config, described_configs=None)
futures = admin_client.incremental_alter_configs(resources=[resource])
for config_resource, future in futures.items():
try:
future.result()
print(f'Updated topic config for topic {config_resource}')
except Exception as exception:
print(f'Failed to update topic config for topic {config_resource}, {exception}')
I found the documentation quite hard to follow.
I based this code on this example. It seemed from reading the docs altering the config could be done in a similar way to topic creation. I am not totally sure what is wrong.
Upvotes: 1
Views: 369
Reputation: 17276
Some helpful debug info, helps to explain what the data structure of ConfigEntry
items should be:
# Print some debug info
config_resource = ConfigResource(ResourceType.TOPIC, name=topic_name)
futures = admin_client.describe_configs([config_resource])
for config_resource, future in futures.items():
print(f'config_resource={config_resource}')
try:
dictionary = future.result()
print(dictionary)
except Exception as exception:
print(f'Failed to create topic {config_resource}, {exception}')
resource = \
ConfigResource(
ResourceType.TOPIC,
name=topic_name,
set_config=None,
described_configs=None,
incremental_configs=[
ConfigEntry(
name='min.insync.replicas',
value='3',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
]
)
resources = [resource]
futures = admin_client.incremental_alter_configs(resources)
for config_resource, future in futures.items():
try:
future.result()
print(f'Updated topic config for topic {config_resource}')
except Exception as exception:
print(f'Failed to update topic config for topic {config_resource}, {exception}')
If you want to do more than once config at once:
#Update Topic Config
config_entry_cleanup_policy = \
ConfigEntry(
name='cleanup.policy',
value='compact',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
config_entry_min_insync_replicas = \
ConfigEntry(
name='min.insync.replicas',
value='3',
source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
incremental_operation=AlterConfigOpType.SET)
resource = \
ConfigResource(
ResourceType.TOPIC,
name=topic_name,
set_config=None,
described_configs=None,
incremental_configs=[
config_entry_cleanup_policy,
config_entry_min_insync_replicas
]
)
resources = [resource]
futures = admin_client.incremental_alter_configs(resources)
Upvotes: 1