Reputation: 541
We are enabling CDC on specific tables in our MSSQL. We are connecting to a pipeline of migrating data through MSSQL->CDC->DEBEZIUM->KAFKA_CONNECT
There is a table that has more than a million rows, but we need only a few thousand rows from the table to be included in the Snapshot Created when enabling CDC. The reason why I don't want to handle it in our Kafka-Consumer is because, while I need just 1% of the data to be written to Mongo, rest 99% is gonna hit the consumer without any use.
Questions:
Upvotes: 2
Views: 1091
Reputation: 39880
You can use Kafka Connect Single Message Transform (SMT). More precisely, you need the Filter
SMT:
The
filter.condition
is a predicate specifying JSON path that is applied to each record processed, and when this predicate successfully matches the record is either included (whenfilter.type=include
) or excluded (whenfilter.type=exclude
)
In your case, you can include rows that satisfy your desired condition:
transforms=filter-records
transforms.filterExample1.type=io.confluent.connect.transforms.Filter$Value
transforms.filterExample1.filter.condition=$.value[?(@.modified_date > "1/1/2020")]
transforms.filterExample1.filter.type=include
transforms.filterExample1.missing.or.null.behavior=fail
Alternatively, you can decide which rows to exclude:
transforms=filter-records
transforms.filterExample1.type=io.confluent.connect.transforms.Filter$Value
transforms.filterExample1.filter.condition=$.value[?(@.modified_date <= "1/1/2020")]
transforms.filterExample1.filter.type=exclude
transforms.filterExample1.missing.or.null.behavior=fail
Upvotes: 3