lollerskates
lollerskates

Reputation: 1147

How to edit replication factor when Debezium creates a new topic

I am using kafka-connect + Debezium to capture data changes in my MySQL cluster. Currently we use Debezium's table.whitelist to ingest only a very small subset of tables in that MySQL cluster. Whenever I add a new table to the table.whitelist config, Debezium will create a new kafka topic that only uses replication_factor = 2 and partition=1. My Kafka cluster has a total of 4 brokers (1 leader and 3 followers) and in my /opt/kafka-connect/worker.properties file I have set the following:

This results in me having to manually re-balance the topic every time I add a new table and it gets old real fast. enter image description here The last entry is the new MySQL table that was recently added, notice that it is replicated on 2 brokers while the other topics are replicated on 3.

Am I editing the wrong config file or using the wrong settings? Please halp!

Below is my Debezium config

{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "snapshot.locking.mode": "none",
  "max.queue.size": "300000",
  "tasks.max": "8",
  "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"<USER>\"   password=\"<PASSWORD>\";",
  "database.history.kafka.topic": "dbhistory_mysql",
  "database.history.consumer.ssl.truststore.password": "<PASSWORD>",
  "database.history.consumer.security.protocol": "SASL_SSL",
  "database.history.consumer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
  "table.whitelist": "schema.table1,schema.table2,schema.table3,schema.table4,schema.table5",
  "decimal.handling.mode": "string",
  "database.history.kafka.recovery.poll.interval.ms": "30000",
  "database.history.producer.sasl.mechanism": "PLAIN",
  "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required   username=\"<USER>\"   password=\"<PASSWORD>\";",
  "database.user": "debezium",
  "database.server.id": "123",
  "database.history.producer.security.protocol": "SASL_SSL",
  "database.history.kafka.bootstrap.servers": "broker1.com:9092,broker2.com:9092,broker3.com:9092,broker4.com:9092",
  "database.history.producer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
  "database.server.name": "mysql",
  "database.port": "3306",
  "database.serverTimezone": "US/Pacific",
  "database.history.producer.ssl.truststore.password": "<PASSWORD>",
  "database.hostname": "mysql.cluster.amazonaws.com",
  "database.password": "<PASSWORD>",
  "name": "mysql-connector",
  "max.batch.size": "20480",
  "database.history.consumer.sasl.mechanism": "PLAIN"
}

Upvotes: 1

Views: 2556

Answers (2)

Achyut Vyas
Achyut Vyas

Reputation: 501

You can specify topics replication factor and partition count in debezium configuration

Ref

Ex

add below line in your debezium connector config

"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "60"

Upvotes: 1

OneCricketeer
OneCricketeer

Reputation: 191864

Those three properties are internal Connect topics, not auto created user topics.

For auto-created topics (which is recommended to have disabled), you set default partition and replication factors in the broker server.properties

num.partitions
default.replication.factor

I believe these are also dynamic variables that can be overriden using kafka-configs tooling... Maybe with --alter --entity-type brokers --entity-default... But probably easier to reason about if the property is actually in a config file

Upvotes: 1

Related Questions