Dyan
Dyan

Reputation: 81

How to perform Ad-Hoc Snapshot with Debezium SQL Server Source Connector

I've following the documentation at https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html#sqlserver-ad-hoc-snapshots

But when I registered the source connector to perform ad-hoc snapshot on 'dbo.customers' table with 'WHERE' condition 'last_name'='Walker' by writing to signaling table '{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}'. The connector still capture and snapshot all rows from 'customers' table, not 1 row as I expected.

I don't know which step did I miss ?

Here's my configuration steps:

  1. Populate DB
CREATE DATABASE testDB;
GO
USE testDB;
EXEC sys.sp_cdc_enable_db;
CREATE TABLE customers (
  id INTEGER IDENTITY(1001,1) NOT NULL PRIMARY KEY,
  first_name VARCHAR(255) NOT NULL,
  last_name VARCHAR(255) NOT NULL,
  email VARCHAR(255) NOT NULL UNIQUE
);
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Sally','Thomas','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('George','Bailey','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Edward','Walker','[email protected]');
INSERT INTO customers(first_name,last_name,email)
  VALUES ('Anne','Kretchmar','[email protected]');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0;
  1. Create Signaling table & add a signal record
CREATE TABLE debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
INSERT INTO dbo.debezium_signal (id, type, data) 
VALUES ('ad-hoc-1','execute-snapshot','{"data-collections": ["dbo.customers"],"type":"incremental","additional-conditions":"last_name=Walker"}');
  1. Config Source Connector and Start it.
{
    "name": "customer-adhoc",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "topic.prefix" : "CDC",
        "database.hostname" : "sqlserver12",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.names" : "testDB",
        "snapshot.mode": "initial",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka12:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory",
        "include.schema.changes": "true",
        "database.encrypt": "false",
        "table.include.list": "dbo.customers,dbo.debezium_signal",
        "column.mask.with.0.chars": "testDB.dbo.customers.first_name, testDB.dbo.customers.last_name",
        "schema.history.internal.store.only.captured.tables.ddl": "true",
        "schema.history.internal.store.only.captured.databases.ddl": "true",
        "incremental.snapshot.allow.schema.changes" : "true" ,
        "key.converter.apicurio.registry.auto-register": "true",
        "key.converter.apicurio.registry.find-latest": "true",
        "value.converter.apicurio.registry.auto-register": "true",
        "value.converter.apicurio.registry.find-latest": "true",
        "schema.name.adjustment.mode": "avro",
        "value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
        "value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy",
        "key.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "value.converter.apicurio.registry.url": "http://****:8080/apis/registry/v2",
        "signal.data.collection": "testDB.dbo.debezium_signal",
        "signal.kafka.topic":"CDC.dbz-signal",
        "kafka.consumer.offset.commit.enabled": "true",
        "signal.kafka.groupId": "customer-kafka-signal",
        "signal.kafka.bootstrap.servers": "kafka12:9092"
    }
}

Upvotes: 0

Views: 1673

Answers (2)

Dyan
Dyan

Reputation: 81

I solved my issue. Thanks to Artem and Panagiotis Kanavos. When Connector's configuration "snapshot.mode" is set to "initial", all rows in the table were captured. Snapshot, can be done with subset datas of a source table as mentioned before. The one that cause full snapshot is "initial" ( I was careless not to pay attention to this option ).

So, here's my procedure:

  1. Set "snapshot.mode" to "schema-only". ( Only changes from now on will be captured )
  2. Restart Connector.
  3. Add Signal Record to Signaling Table

The Ad-Hoc snapshot works like charm.

Upvotes: 0

Artem
Artem

Reputation: 16

There are 2 types of Debezium snapshot:

  • Initial snapshot using snapshot.mode parameter. You have initial mode. It makes a full snapshot when you restart the connector. That's why you have all rows captured at the beginning.
  • Incremental ad hoc snapshot using signal table. Each time you insert a signal, you get the requested set of data. So, it is easy to check: just insert a request to the signal table when connector is started. If it does not work, review format of your request. For instance, "additional-conditions" should be according to the documentation. Or you can insert it without any conditions first, and then if it works, add "additional-conditions".

Upvotes: 0

Related Questions