Reputation: 616
We are trying to move our database from mysql to couchbase and implement some CDC (change data capture) logic for copying data to our new db.
All enviroments set up and running. mysql, debezium, kafka, couchbase, kubernetes, pipeline etc. And also we are set up our kafka-source connector for debezium. here it is:
- name: "our-connector"
config:
connector.class: "io.debezium.connector.mysql.MySqlConnector"
tasks.max: "1"
group.id: "our-connector"
database.server.name: "our-api"
database.hostname: "******"
database.user: "******"
database.password: "******"
database.port: "3306"
database.include.list: "our_db"
column.include.list: "our_db.our_table.our_field"
table.include.list: "our_db.our_table"
database.history.kafka.topic: "inf.our_table.our_db.schema-changes"
database.history.kafka.bootstrap.servers: "kafka-cluster-kafka-bootstrap.kafka:9092"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
key.converter: "org.apache.kafka.connect.json.JsonConverter"
key.converter.schemas.enable: "false"
snapshot.locking.mode: "none"
tombstones.on.delete: "false"
event.deserialization.failure.handling.mode: "ignore"
database.history.skip.unparseable.ddl: "true"
include.schema.changes: "false"
snapshot.mode: "initial"
transforms: "extract,filter,unwrap"
predicates: "isOurTableChangeOurField"
predicates.isOurTableChangeOurField.type: "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
predicates.isOurTableChangeOurField.pattern: "our-api.our_db.our_table"
transforms.filter.type: "com.redhat.insights.kafka.connect.transforms.Filter"
transforms.filter.if: "!!record.value() && record.value().get('op') == 'u' && record.value().get('before').get('our_field') != record.value().get('after').get('our_field')"
transforms.filter.predicate: "isOurTableChangeOurField"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
transforms.unwrap.drop.tombstones: "false"
transforms.unwrap.delete.handling.mode: "drop"
transforms.extract.type: "org.apache.kafka.connect.transforms.ExtractField{{.DOLLAR_SIGN}}Key"
transforms.extract.field: "id"
and this configuration publish this message to kafka. captured from kowl.
as you can see we have original records id and changed fields new value.
no problem so far. Actually we have problem :) Our field is DATETIME type in mysql, but debezium publish it as unixtime.
First question how can we publish this with formatted datetime (YYYY-mm-dd HH:ii:mm for example)?
lets move on.
here is the actual problem. we have searched a lot but all examples are recording whole data to couchbase. but we already created this record in couchbase, just want to data up to date. actually we manipulated data also.
here is example data from couchbase
we want to change only bill.dateAccepted field in couchbase. tried some yaml config but no success on sink.
here is are sink config
- name: "our-sink-connector-1"
config:
connector.class: "com.couchbase.connect.kafka.CouchbaseSinkConnector"
tasks.max: "2"
topics: "our-api.our_db.our_table"
couchbase.seed.nodes: "dev-couchbase-couchbase-cluster.couchbase.svc.cluster.local"
couchbase.bootstrap.timeout: "10s"
couchbase.bucket: "our_bucket"
couchbase.topic.to.collection: "our-api.our_db.our_table=our_bucket._default.ourCollection"
couchbase.username: "*******"
couchbase.password: "*******"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
key.converter.schemas.enable: "false"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
connection.bucket : "our_bucket"
connection.cluster_address: "couchbase://couchbase-srv.couchbase"
couchbase.document.id: "${/id}"
Upvotes: 2
Views: 594
Reputation: 8909
The Couchbase source connector does not support watching individual fields. In general, the Couchbase source connector is better suited for replication than for change data capture. See the caveats mentioned in the Delivery Guarantees documentation.
The Couchbase Kafka sink connector supports partial document updates via the built-in SubDocumentSinkHandler
or N1qlSinkHandler
. You can select the sink handler by configuing the couchbase.sink.handler
connector config property, and customize its behavior with the Sub Document Sink Handler config options.
Here's a config snippet that tells the connector to update the bill.dateAccepted
property with the entire value of the Kafka record. (You'd also need to use a Single Message Transform to extract just this field from the source record.)
couchbase.sink.handler=com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler
couchbase.subdocument.path=/bill/dateAccepted
If the built-in sink handlers are not flexible enough, you can write your own custom sink handler using the CustomSinkHandler.java
example as a template.
Upvotes: 0
Reputation: 4948
Partial answer to your first question. One approach would be that You can use an SPI converter to convert the unixdatetime to string. if you want to convert all the datetimes and your input message contains many datetime fields, you can just look at the JDBCType and do the conversion https://debezium.io/documentation/reference/stable/development/converters.html
As for extracting I/U , you can write a custom SMT (Single message transform) that has before and after records and also has the operation type (I/U/D) and comparing before and after fields extract the delta. In the past when i tried something for this , I bumped upon the following which came in quite handy as a reference. This way you have a delta field and a key and that can just update instead of updating the full document (though the sink has to support it will come in at some point)
https://github.com/michelin/kafka-connect-transforms-qlik-replicate
Upvotes: 1