Reputation: 8229
After setting up both source and sink connectors, I get problems with DATE
type Postgres columns.
ERROR: column "foo" is of type date but expression is of type integer
I checked Avro schema and see that column foo
was serialized as io.debezium.time.Date
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "io.debezium.time.Date",
"connect.version": 1,
"type": "int"
}
]
}
What should I do to let sink connector insert this values correctly (as DATE
, not INTEGER
)?
Full stacktrace:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:89)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
... 10 more
Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "test_table" ("id","foo") VALUES (75046,18577) ON CONFLICT ("id") DO UPDATE SET "foo"=EXCLUDED."foo" was aborted: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249 Call getNextException to see other errors in the batch.
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
org.postgresql.util.PSQLException: ERROR: column "foo" is of type date but expression is of type integer
Hint: You will need to rewrite or cast the expression.
Position: 249
... 12 more
Source config:
{
"name": "dbz-source-test-1",
"config": {
"name":"dbz-source-test-1",
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.hostname":"some.host",
"database.port":"5432",
"database.user":"test_debezium",
"database.password":"password",
"database.dbname":"dbname",
"plugin.name":"wal2json_rds",
"slot.name":"wal2json_rds",
"database.server.name":"server_test",
"table.whitelist":"public.test_table",
"transforms":"route",
"transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement":"dbz_source_$3",
"topic.selection.strategy":"topic_per_table",
"include.unknown.datatypes":true,
"decimal.handling.mode":"double",
"snapshot.mode":"never"
}
}
Sink config:
{
"name": "dbz-sink-test-1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"config.providers" : "file",
"config.providers.file.class" : "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers.file.param.secrets" : "/opt/mysecrets",
"topics": "dbz_source_test_table",
"connection.url": "someurl",
"connection.user": "${file:/opt/mysecrets.properties:user}",
"connection.password" : "${file:/opt/mysecrets.properties:pass}",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"table.name.format": "dbz_source_",
"insert.mode": "upsert",
"pk.field": "id",
"pk.mode": "record_value"
}
}
Upvotes: 3
Views: 2936
Reputation: 8229
I fixed problem switching source connector time.precision.mode
config to connect
When the time.precision.mode configuration property is set to connect, then the connector will use the predefined Kafka Connect logical types. This may be useful when consumers only know about the built-in Kafka Connect logical types and are unable to handle variable-precision time values.
After it serialization type becomes different:
{
"default": null,
"name": "foo",
"type": [
"null",
{
"connect.name": "org.apache.kafka.connect.data.Date",
"connect.version": 1,
"logicalType": "date",
"type": "int"
}
]
}
Sink connector is aware of org.apache.kafka.connect.data.Date
type and inserts in correctly.
Upvotes: 4