AustinTX
AustinTX

Reputation: 1352

changelog topic in Kafka streams - setting or altering partitions

We have a stream processor app that consumes the data from a topic with n partitions ( n > 1 ).

From the fresh start (without the changelog topic), the dev environment always creates a changelog topic with n partitions.

In the same scenario, on production the number of partitions is always equal to 1 then we have manually changed to n to match the # of partitions of the topic.

I check all the documents try to set the number of partitions for the changelog but I could not find any way to do it. My last option is to check if the changelog topic does not exist then I create it with n partitions.

Since the framework creates that topic automatically, are there any way to set the number of partitions for the changelog without creating that topic manually or in code?

PS: We are using Kafka client version 2.3.1.

Thanks,

Austin

Upvotes: 1

Views: 2702

Answers (3)

Abe
Abe

Reputation: 696

changelog partition number comes from source topic. It can't be configured.

sourceTopic -> group repartition -> changelog

Some properties can be configure using .withLoggingEnabled

retention.ms
segment.ms
...

Changlog name can be configured when Materialized. It applies to both changelog and RocksDB

Materialized.<String, MyAgg, KeyValueStore<Bytes, byte[]>>as("myChangLog")

Upvotes: 0

DEEPAK R.L.
DEEPAK R.L.

Reputation: 19

Currently we are connecting to SSL enabled MSK topics and as such we dont have write access to create internal topics via the app. Hence as a work around we had asked the MSK admins to create changelog topics with the desired name manually so that the app could read it.

Moreover, currently all our User topics are of 3 partitions and the changelog topics created have also 3 partition with the below updated settings. These settings would come in handy just in case you are trying to create a changelog topic manually (compaction is enabled to save space) :

Configs for Changelog

Moreover changelog topic names would look like this : (your-application-id)-(userDefined property under materializedAs)-changelog

Upvotes: 0

Lalit
Lalit

Reputation: 2014

I've just looked at the source code to find out the details of this feature and at the time of this writing, it turns out that setting the partitions of the change-logs topic is not allowed.

Explanation

The change-logs topic is classified as an internal topic and there is evidence for this in the following 2 classes (InternalTopicConfig and InternalTopicManager):

  1. The source code of the class InternalTopicConfig contains the following method that also says the number of partitions on such internal topics are enforced:

    public void setNumberOfPartitions(final int numberOfPartitions) {
    if (hasEnforcedNumberOfPartitions()) {
        throw new UnsupportedOperationException("number of partitions are enforced on topic " +
                                                "" + name() + " and can't be altered.");
    ...
    
  2. The embedded documentation in the source code of the class InternalTopicManager clearly states this for the makeReady() method.

    /**
    * Prepares a set of given internal topics.
    *
    * If a topic does not exist creates a new topic.
    * If a topic with the correct number of partitions exists ignores it.
    * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
    * @return the set of topics which had to be newly created
    */
    public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) 
    ...
    

As you can see in the comments, if such a topic exists with correct partition count it will be ignored and if the partition count is incorrect then you'll see errors and the recommendation is to use the application reset tool.

Hope this helps!

Upvotes: 0

Related Questions