user2138149
user2138149

Reputation: 17276

How does one alter a topic configuration using incremental_alter_configs?

I am trying to create a Python script to automate topic creation and configuration.

I have managed to make the topic creation part work. I get a runtime error when trying to alter the configuration.

ValueError: expected non-empty list of ConfigEntry to alter incrementally in incremental_configs field

Here's my code:

config = {
    'min.insync.replicas': '3'
}

resource = ConfigResource(ResourceType.TOPIC, name=topic_name, set_config=config, described_configs=None)

futures = admin_client.incremental_alter_configs(resources=[resource])

for config_resource, future in futures.items():
    try:
        future.result()
        print(f'Updated topic config for topic {config_resource}')
    except Exception as exception:
        print(f'Failed to update topic config for topic {config_resource}, {exception}')

I found the documentation quite hard to follow.

I based this code on this example. It seemed from reading the docs altering the config could be done in a similar way to topic creation. I am not totally sure what is wrong.

Upvotes: 1

Views: 369

Answers (1)

user2138149
user2138149

Reputation: 17276

Some helpful debug info, helps to explain what the data structure of ConfigEntry items should be:

# Print some debug info
config_resource = ConfigResource(ResourceType.TOPIC, name=topic_name)
futures = admin_client.describe_configs([config_resource])

for config_resource, future in futures.items():
    print(f'config_resource={config_resource}')
    try:
        dictionary = future.result()
        print(dictionary)
    except Exception as exception:
        print(f'Failed to create topic {config_resource}, {exception}')

Solution

resource = \
    ConfigResource(
        ResourceType.TOPIC,
        name=topic_name,
        set_config=None,
        described_configs=None,
        incremental_configs=[
            ConfigEntry(
                name='min.insync.replicas', 
                value='3',
                source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
                incremental_operation=AlterConfigOpType.SET)
        ]
    )

resources = [resource]

futures = admin_client.incremental_alter_configs(resources)

for config_resource, future in futures.items():
    try:
        future.result()
        print(f'Updated topic config for topic {config_resource}')
    except Exception as exception:
        print(f'Failed to update topic config for topic {config_resource}, {exception}')

If you want to do more than once config at once:

#Update Topic Config
config_entry_cleanup_policy = \
    ConfigEntry(
        name='cleanup.policy',
        value='compact',
        source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
        incremental_operation=AlterConfigOpType.SET)

config_entry_min_insync_replicas = \
    ConfigEntry(
        name='min.insync.replicas',
        value='3',
        source=ConfigSource.DYNAMIC_TOPIC_CONFIG,
        incremental_operation=AlterConfigOpType.SET)

resource = \
    ConfigResource(
        ResourceType.TOPIC,
        name=topic_name,
        set_config=None,
        described_configs=None,
        incremental_configs=[
            config_entry_cleanup_policy,
            config_entry_min_insync_replicas
        ]
    )

resources = [resource]

futures = admin_client.incremental_alter_configs(resources)

Upvotes: 1

Related Questions