Golikov Andrey
Golikov Andrey

Reputation: 11

Handling DELETE operations with ClickHouse Kafka Connect Sink

I'm working on a project related to building streaming change data capture using Kafka Connect. The source of changes is MySQL, then they are sent to the corresponding Kafka topic, and then applied to the table in ClickHouse using ClickHouse Kafka Connect Sink. Operations like INSERT and UPDATE are processed normally, but this sink connector cannot process operations like DELETE according to documentation (https://clickhouse.com/docs/en/integrations/kafka/clickhouse-kafka-connect-sink#limitations:~:text=Deletes%20are%20not%20supported) and usage experience.

Is there any possible solution here for this problem?

I tried to solve this problem using InsertField transformation with RecordIsTombstone predicate in the MySQL connector like:

{
    "name": "mysql-source-connector",    
    ...,
    "predicates": "IsTombstone",
    "predicates.IsTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "transforms": "MarkAsDeleted",
    "transforms.MarkAsDeleted.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.MarkAsDeleted.static.field": "is_deleted",
    "transforms.MarkAsDeleted.static.value": 1,
    "transforms.MarkAsDeleted.predicate": "IsTombstone"
}

Here, if RecordIsTombstone returns true, then we insert the is_deleted field with the value 1 (if not - do nothing, because is_deleted field in ClickHouse table has 0 as default value), and then on the ClickHouse side we are already building a material representation in which we delete WHERE is_deleted = 1. But this solution did not lead to anything, because ClickHouse Kafka Connect Sink simply ignores everything records in the Kafka topic that have the key-value "op": "d" (kind of "operation": "deleted").

Upvotes: 0

Views: 475

Answers (2)

Golikov Andrey
Golikov Andrey

Reputation: 11

So I solved my problem like this:

  1. Create a __deleted column with a boolean data type;
  2. In the config to register the source connector, add ExtractNewRecordState and Cast SMTs (Single Message Transformations);
  3. For smt extract, set drop.tombstone to false and delete.handling.mode to overwrite;
  4. Cast the __deleted field to a boolean data type (the default is a string).

This will work if you are using native Debezium connectors, since the ExtractNewRecordState transformation is only available on Debezium connectors. All that is then needed is DELETE FROM...WHERE __deleted=true.

Upvotes: 1

gingerwizard
gingerwizard

Reputation: 522

You will need to use a ReplacingMergeTree. This require you to:

  1. Send the whole record again for updates with a higher version number
  2. For deletes you send the record with is_deleted column

Rows will be uniquely identifed using the order by columns of table.

Details https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree

Upvotes: 2

Related Questions