japoneizo
japoneizo

Reputation: 548

Updating a Debezium MySQL connector with table whitelist option

I'm using the Debezium (0.7.5) MySQL connector and I'm trying to understand what is the best approach if I want to update this configuration with the option table.whitelist.

Let's say I create a connector, something like this:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://debezium-host/connectors/ -d '
{
  "name": "MyConnector",
  "config": {
      "connector.class": "io.debezium.connector.mysql.MySqlConnector",
      "connect.timeout.ms": "60000",
      "tasks.max": "1",
      "database.hostname": "myhost",
      "database.port": "3306",
      "database.user": "***",
      "database.password": "***",
      "database.server.id": "3227197",
      "database.server.name": "MyServer",
      "database.whitelist": "myDb",
      "table.whitelist": "myDb.table1,myDb.table2",
      "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
      "database.history.kafka.topic": "MyConnectorHistoryTopic",
      "max.batch.size": "1024",
      "snapshot.mode": "initial",
      "decimal.handling.mode": "double"
    }
}'

After some time (2 weeks), I need to add a new table (myDb.table3) to this table.whitelist option (and this table is an old one, it was created before the connector)

What I tried was:

Update command via API:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" https://kafka-connect-host/connectors/MyConnector/config/ -d '
{
  "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  "connect.timeout.ms": "60000",
  "tasks.max": "1",
  "database.hostname": "myhost",
  "database.port": "3306",
  "database.user": "***",
  "database.password": "***",
  "database.server.id": "3227197",
  "database.server.name": "MyServer",
  "database.whitelist": "myDb",
  "table.whitelist": "myDb.table1,myDb.table2,myDb.table3",
  "database.history.kafka.bootstrap.servers": "kb0:9092,kb1:9092,kb2:9092",
  "database.history.kafka.topic": "MyConnectorHistoryTopic",
  "max.batch.size": "1024",
  "snapshot.mode": "schema_only",
  "decimal.handling.mode": "double"
}'

But it didn't work and maybe this isn't the best approach at all. In other connectors I'm not using the option table.whitelist, so when I needed to listen na new table, I didn't have this problem.

My last option, I think would be delete this connector and create another one with this new configuration also listening the new table (myDb.table3). The problem is if I want the initial data from myDb.table3 I would have to create with the snapshot initial but I don't wanna to generate all the messages from the snapshot from the other tables myDb.table1,myDb.table2.

Upvotes: 12

Views: 16181

Answers (3)

i am have the same problem and solve with a signal table to debezium. Its work that way, you have to create a table to send to debezium commands in your datatable.

CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32)  NULL, data VARCHAR(2048)  NULL);

and set in your configuration do debzium a tag "signal.data.collection": "public.debezium_signal"

after that you can send commands with insert in that table:

INSERT INTO debezium_signal (id, type, data)
VALUES(gen_random_uuid(),'execute-snapshot','{"data-collections": "myDb.table3"]}');

in my case i have to add de table signal in table.include.list and the columns in column.include.list as well.

https://debezium.io/documentation/reference/stable/configuration/signalling.html

Upvotes: 2

The latest version of Debezium Server, you can add the following config

debezium.snapshot.new.tables=parallel

In case If you are using Debezium, you can try this config value

snapshot.new.tables=parallel

Note: Debeziyum Server is the one that supports Kinesis, Google Pub sub, and Apache Pulsar. I am using that and its configuration is a bit different. I had to prepend "debezium" before each item

Once this configuration is added, any addition to tables.whitelist, For these additional tables Debezium will create snapshots.

I cannot point you to the documentation but I went through their code in GitHub and also I tried it practically which worked for me. Here is the link to the MySqlConnector code

https://github.com/debezium/debezium/blob/master/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorConfig.java

There search for Field.create("snapshot.new.tables")

Personally, I feel like Debezium has a lot of things but documentation is scattered.

Upvotes: 4

Gunnar
Gunnar

Reputation: 19010

Changes to the whitelist/blacklist config are not yet supported at this point. This is currently being worked on (see DBZ-175), and we hope to have preview support for this in one of the next releases. There's a pending PR for this, which needs a bit more work, though.

Until this has been implemented, your best option is to set up a new instance of the connector which only captures the additional tables you're interested in. This comes at the price of running two connectors (which both will maintain a binlog reader session), but it does the trick as long as you don't need to change your filter config too often.

Upvotes: 4

Related Questions