Reputation: 780
I need to replicate a MySQL database to a PostgreSQL database. I opted for:
The data is being replicated, however, I am losing some schema information. For example, a column with datetime
format in mysql is replicated as bigint
in Postgres, foreign keys are not created, also the order of columns is not preserved (which is nice to have), etc..
PostgreSQL sink connector:
{
"name": "jdbc-sink-dbt",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "2",
"topics.regex": "test_(.*)",
"connection.url": "jdbc:postgresql://dbt-postgres:5432/test?user=postgres&password=postgres",
"transforms": "unwrap,removePrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.removePrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.removePrefix.regex": "test_([^.]+)",
"transforms.removePrefix.replacement": "$1",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
MySQL connector:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.17.0.1",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.allowPublicKeyRetrieval": "true",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.test",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2_$3",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Debezium connect configuration:
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
Upvotes: 0
Views: 552
Reputation: 21133
For example, a column with datetime format in mysql is replicated as bigint
This is due to the default time.precision.mode
used by the Debezium connector on the source side. If you look at the documentation, you'll notice that the default precision emits datetime
columns as INT64, which explains why the sink connector writes the contents as a bigint
.
You can set the time.precision.mode
to connect
on the source side for now so that the values can be properly interpreted by the JDBC sink connector.
foreign keys are not created
That's to be expected, see this Confluent GitHub Issue. At this time, the JDBC sink does not have the capabilities to support materializing Foreign Key relationships at the JDBC level.
order of columns is not preserved
That is also to be expected. There is no expected guarantee that Debezium should store the relational columns in the exact same order as they are in the database (although we do) and the JDBC sink connector is under no guarantee to retain the order of the fields as they're read from the emitted event. If the sink connector uses some container like a HashMap to store column names, it's plausible that the order would be very different than the source database.
If there is a necessity to retain a higher level of relational metadata such as foreign keys and column order at the destination system that mirrors that of the source system, you may need to look into a separate toolchain to replicate the initial schema and relationships through some type of schema dump, translation, and import to your destination database and then rely on the CDC pipeline for the data replication aspect.
Upvotes: 0