Reputation: 929
I'm working with Debezium + Kafka to perform CDC on a MySQL Database. Everything is working but i'm running into an issue with the TIMESTAMP
and DATETIME
column formats in MySQL mapping over to Kafka.
For any TIMESTAMP
columns they get stored in this format "2023-06-22T19:29:26Z"
in Kafka, which is perfect for my use case and works fine. However, any DATETIME
columns are getting stored as UNIX timestamps 1687462166000
, which does not work for my use case.
I'll have to spend a significant amount of time re-engineering things if these columns are stored as UNIX timestamps across like 50+ different tables in my project, so I'm looking for an easy way to get those DATETIME
values stored in Kafka the same as the TIMESTAMP
ones are. I've been looking at SMTs but haven't found a great solution yet. I'm hoping I can fix this by adding a few parameters to the Debezium Config & call it a day. Below is some more info to understand what's going on.
Debezium Config
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/mysql-debezium-test/config \
-d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "44",
"database.server.name": "asgard2",
"table.whitelist": "demo.movies,demo.second_movies",
"database.history.kafka.bootstrap.servers": "broker:29092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "false",
"snapshot.mode": "schema_only",
"time.precision.mode": "adaptive",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard2.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"mysql2.$1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"log.retention.hours": "120",
"poll.interval.ms": "30000"
}'
MySQL Table + Column Types Example
CREATE TABLE `movies`
(
`movie_id` int(11) NOT NULL,
`title` varchar(256) NOT NULL,
`release_year` int(11) NOT NULL,
`country` varchar(256) NOT NULL,
`genres` varchar(256) NOT NULL,
`actors` varchar(1024) NOT NULL,
`directors` varchar(512) NOT NULL,
`composers` varchar(256) NOT NULL,
`screenwriters` varchar(256) NOT NULL,
`cinematographer` varchar(256) NOT NULL,
`production_companies` varchar(256) NOT NULL,
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`created_at_datetime` DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`movie_id`)
);
Message as stored in Kafka
{
"movie_id": 2,
"title": "10,000 B.C. try 2",
"release_year": 2008,
"country": "United States",
"genres": "Adventure",
"actors": "Steven Strait|Camilla Belle|Cliff Curtis|Omar Sharif|Tim Barlow|Marco Khan|Reece Ritchie|Mo Zinal",
"directors": "Roland Emmerich",
"composers": "Harald Kloser|Thomas Wanker",
"screenwriters": "Roland Emmerich|Harald Kloser|John Orloff|Matthew Sand|Robert Rodat",
"cinematographer": "Ueli Steiger",
"production_companies": "Warner Bros. Pictures|Legendary Pictures|Centropolis",
"created_at": "2023-06-22T19:29:26Z",
"created_at_datetime": 1687462166000,
"__deleted": "false"
}
Raw data in MySQL
Anybody have any ideas on a quick fix that doesn't involve changing the actual column types in MySQL, or having to perform transformations in the application that consumes these Kafka messages? Any help would be appreciated. Thanks!
Upvotes: 0
Views: 2379
Reputation: 733
KafkaConnect does neither support DateTime
type nor Timestamp
type. that is why Debezium
converts these data types to Long
type and you need to use a Converter to fix it or switch to the debezium-jdbc-connect
Upvotes: 1