Reputation: 173
I'm working with a Kafka Cluster that is replicating data contained into a MySQL DB to an old Oracle DB, to achieve that I've connected the Source DB to the Debezium Mysql Connector and the Sink DB to the Debezium connector for JDBC.
I'm trying to replicate every table of a DB and I'm having an issue at the Sink level.
Basically on the Source server there's a table's field with a NOT NULL constraint and whoever inserted the data on that db worked around that requirement by inserting empty "" strings.
The Debezium JDBC Sink connector manages to correctly read the schema of the source table and recreates it on the Sink DB with all of its constraints but unfortunately Oracle DB interprets and empty string as a NULL value and so the INSERT gets refused by the DB and therefore Debezium crashes (of course).
So this is the average payload that Debezium (Source) writes into the Kafka Cluster, as you can see the third field contains an empty double quoted value:
....
"payload": {
"before": null,
"after": {
"field1": 852,
"field2": 480,
"field3": ""
},
...
Is there an easy way to handle this problem at a general level i.e. having the solution to work for any field of every table that might contain empty double quotes without having to specify the field's name?
Because unfortunately the Single Message Transform that replaces the value of a field with another value requires the field's name and that defeats the purpose of having Debezium taking care of everything.
This is the current configuration of the Debezium JDBC Sink Connector
{
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:oracle:thin:@ldap://ldpap.com:3060/testdb,cn=OracleContext,dc=domain,dc=priv",
"connection.username": "username",
"connection.password": "password",
"insert.mode": "insert",
"schema.evolution": "basic",
"database.time_zone": "UTC",
"topics.regex": "(DBNAME\.DBNAME\.([^.]+)$)",
"quote.identifiers": "true",
}
The connector correctly binds the Kafka topics containing the data and writes a few tables of the Source DB on the Sink DB until it reaches the offending table and then it crashes. Setting "insert.mode":"upsert"
with Debezium taking care of the primary keys.
Upvotes: 0
Views: 545
Reputation: 35603
I'm afraid there really is no simple solution to this that I can find.
Apparently, you might be able to create your own custom SMT that would intercept the data and replace empty strings with some other acceptable character or string - but a. this is outside my expertise and b. I think this may be too risky and would require extensive testing I think. (Plus this info was gained through an AI so that may be even more risky.)
So the only relatively simple process I can think of is to locate all tables/columns in MySQL that will give rise to this issue and for you to address it in MySQL before the transfer. e.g. to remove the not null constraint or to update empty strings to something else.
So find not null string columns and test if it contains empty strings:
DELIMITER //
CREATE PROCEDURE FindEmptyStringColumns()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE tableName CHAR(64);
DECLARE columnName CHAR(64);
DECLARE cur CURSOR FOR
SELECT TABLE_NAME, COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND DATA_TYPE IN ('char', 'varchar', 'text')
AND IS_NULLABLE = 'NO';
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
read_loop: LOOP
FETCH cur INTO tableName, columnName;
IF done THEN
LEAVE read_loop;
END IF;
SET @s = CONCAT('SELECT "', tableName, '.', columnName,
'" AS table_column, COUNT(*) AS empty_string_count ',
'FROM ', tableName,
' WHERE TRIM(', columnName, ') = ""');
PREPARE stmt FROM @s;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END LOOP;
CLOSE cur;
END//
DELIMITER ;
then:
CALL FindEmptyStringColumns();
Of course this is just a suggestion, it will be up to you to decide what to do with any columns detected.
Upvotes: 0