Reputation: 11
We have an application writing messages to a Kafka topic in the following format.
{
"sub_id": "574",
"sub_name": "john"
}
We are trying to sink with following debezium jdbc sink connecter config.
config:
class: io.debezium.connector.jdbc.JdbcSinkConnector
connection.url: jdbc:postgresql://10.10.10.10:26257/db_dev
connection.username: "******"
connection.password: "********"
topics: "kafka-crdb"
insert.mode: "upsert"
primary.key.mode: "none"
primary.key.fields: "sub_id"
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
transforms: "unwrap"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
but it fails with the following error
2024-09-27 09:27:03,379 ERROR [kafka-sink-connector|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask) [task-thread-kafka-sink-connector-crdb-0]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.buildRecordSinkDescriptor(JdbcChangeEventSink.java:200)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:85)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because the return value of "org.apache.kafka.connect.sink.SinkRecord.valueSchema()" is null
at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:321)
at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:310)
at io.debezium.connector.jdbc.JdbcChangeEventSink.buildRecordSinkDescriptor(JdbcChangeEventSink.java:197)
... 14 more
I have tried without below in Sink connecter config
value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
transforms: "unwrap"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"
but received the following error
Caused by: org.apache.kafka.connect.errors.DataException:
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields.
If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
Upvotes: 1
Views: 283
Reputation: 9116
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because the return value of "org.apache.kafka.connect.sink.SinkRecord.valueSchema()" is null
I had the same issue in a similar setup. The issue was that the database schema was not matching Avro's. The kafka connector wasn't able to convert from Float32 to Decimal(2) on its own.
The exception message and logs were not helpful to narrow down the problem, so hopefully this will help others.
Upvotes: 0