Reputation: 157
I am using kafka connect with debezium jdbc sink.
My config is
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"table.name.format": "canine",
"dialect.name": "PostgreSqlDatabaseDialect",
"connection.password": "redacted",
"primary.key.mode": "record_key",
"primary.key.fields": "staff_id",
"database.time_zone": "UTC",
"snapshot.mode": "no_data",
"topics": "bpets_dev.BPETS.STAFFING.Canine",
"task.max": "1",
"connection.username": "bpets_kafka",
"delete.enabled": "true",
"schema.evolution": "none",
"quote.identifiers": "true",
"name": "dev-becn-canine-connector",
"auto.create": "false",
"auto.evolve": "false",
"connection.url": "jdbc:postgresql://redacted:5432/becn?currentSchema=staff",
"insert.mode": "upsert",
"transforms": "RenameField,RenameKey",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames":"Id:canine_id,Created:create_dttm,CreatedBy:create_by,Updated:update_dttm,UpdatedBy:update_by,Name:canine_nm,DutyEndDate:duty_end_dttm,DutyStartDate:duty_start_dttm,StaffId:staff_id",
"transforms.RenameKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
"transforms.RenameKey.renames": "Id:canine_id"
}
We are sinking to an existing table. We do not want kafka connect to alter table. But I do see alter table being issued for all not null columns in the database. Is there a work arround for this
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:229) at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:154) at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103) ... 12 more Caused by: java.sql.SQLException: Cannot ALTER table 'canine' because field 'StaffId' is not optional but has no default value at io.debezium.connector.jdbc.JdbcChangeEventSink.alterTableIfNeeded(JdbcChangeEventSink.java:340) at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:277) at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:218) ... 14 more
Upvotes: 0
Views: 68