Reputation: 1147
I am using kafka-connect + Debezium to capture data changes in my MySQL cluster. Currently we use Debezium's table.whitelist
to ingest only a very small subset of tables in that MySQL cluster. Whenever I add a new table to the table.whitelist
config, Debezium will create a new kafka topic that only uses replication_factor = 2
and partition=1
. My Kafka cluster has a total of 4 brokers (1 leader and 3 followers) and in my /opt/kafka-connect/worker.properties
file I have set the following:
offset.storage.replication.factor=4
config.storage.replication.factor=4
status.storage.replication.factor=4
This results in me having to manually re-balance the topic every time I add a new table and it gets old real fast. The last entry is the new MySQL table that was recently added, notice that it is replicated on 2 brokers while the other topics are replicated on 3.
Am I editing the wrong config file or using the wrong settings? Please halp!
Below is my Debezium config
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"max.queue.size": "300000",
"tasks.max": "8",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<USER>\" password=\"<PASSWORD>\";",
"database.history.kafka.topic": "dbhistory_mysql",
"database.history.consumer.ssl.truststore.password": "<PASSWORD>",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
"table.whitelist": "schema.table1,schema.table2,schema.table3,schema.table4,schema.table5",
"decimal.handling.mode": "string",
"database.history.kafka.recovery.poll.interval.ms": "30000",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<USER>\" password=\"<PASSWORD>\";",
"database.user": "debezium",
"database.server.id": "123",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.kafka.bootstrap.servers": "broker1.com:9092,broker2.com:9092,broker3.com:9092,broker4.com:9092",
"database.history.producer.ssl.truststore.location": "/PATH/TO/TRUSTSTORE.jks",
"database.server.name": "mysql",
"database.port": "3306",
"database.serverTimezone": "US/Pacific",
"database.history.producer.ssl.truststore.password": "<PASSWORD>",
"database.hostname": "mysql.cluster.amazonaws.com",
"database.password": "<PASSWORD>",
"name": "mysql-connector",
"max.batch.size": "20480",
"database.history.consumer.sasl.mechanism": "PLAIN"
}
Upvotes: 1
Views: 2556
Reputation: 501
You can specify topics replication factor and partition count in debezium configuration
add below line in your debezium connector config
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "60"
Upvotes: 1
Reputation: 191864
Those three properties are internal Connect topics, not auto created user topics.
For auto-created topics (which is recommended to have disabled), you set default partition and replication factors in the broker server.properties
num.partitions
default.replication.factor
I believe these are also dynamic variables that can be overriden using kafka-configs
tooling... Maybe with --alter --entity-type brokers --entity-default
... But probably easier to reason about if the property is actually in a config file
Upvotes: 1