Reputation: 11
I'm working on a project related to building streaming change data capture using Kafka Connect. The source of changes is MySQL, then they are sent to the corresponding Kafka topic, and then applied to the table in ClickHouse using ClickHouse Kafka Connect Sink. Operations like INSERT and UPDATE are processed normally, but this sink connector cannot process operations like DELETE according to documentation (https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#limitations:~:text=Deletes%20are%20not%20supported) and usage experience.
Is there any possible solution here for this problem?
I tried to solve this problem using InsertField transformation with RecordIsTombstone predicate in the MySQL connector like:
{
"name": "mysql-source-connector",
...,
"predicates": "IsTombstone",
"predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
"transforms": "MarkAsDeleted",
"transforms.MarkAsDeleted.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.MarkAsDeleted.static.field": "is_deleted",
"transforms.MarkAsDeleted.static.value": 1,
"transforms.MarkAsDeleted.predicate": "IsTombstone"
}
Here, if RecordIsTombstone returns true, then we insert the is_deleted field with the value 1 (if not - do nothing, because is_deleted field in ClickHouse table has 0 as default value), and then on the ClickHouse side we are already building a material representation in which we delete WHERE is_deleted = 1. But this solution did not lead to anything, because ClickHouse Kafka Connect Sink simply ignores everything records in the Kafka topic that have the key-value "op": "d" (kind of "operation": "deleted").
Upvotes: 0
Views: 475
Reputation: 11
So I solved my problem like this:
This will work if you are using native Debezium connectors, since the ExtractNewRecordState transformation is only available on Debezium connectors. All that is then needed is DELETE FROM...WHERE __deleted=true.
Upvotes: 1
Reputation: 522
You will need to use a ReplacingMergeTree. This require you to:
Rows will be uniquely identifed using the order by columns of table.
Details https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
Upvotes: 2