Jain Joseph
Jain Joseph

Reputation: 11

Unable to sink from Kafka Topic to Postgres DB with Debezium JDBC Sink connecter

We have an application writing messages to a Kafka topic in the following format.

{
    "sub_id": "574",
    "sub_name": "john"
}

We are trying to sink with following debezium jdbc sink connecter config.

  config:
    class: io.debezium.connector.jdbc.JdbcSinkConnector

    connection.url: jdbc:postgresql://10.10.10.10:26257/db_dev
    connection.username: "******"
    connection.password: "********"
    topics: "kafka-crdb" 
    insert.mode: "upsert"
    primary.key.mode: "none"
    primary.key.fields: "sub_id"
    value.converter: "org.apache.kafka.connect.json.JsonConverter"
    value.converter.schemas.enable: "false"
    transforms: "unwrap"
    transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"

but it fails with the following error

2024-09-27 09:27:03,379 ERROR [kafka-sink-connector|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask) [task-thread-kafka-sink-connector-crdb-0]
org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
        at io.debezium.connector.jdbc.JdbcChangeEventSink.buildRecordSinkDescriptor(JdbcChangeEventSink.java:200)
        at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:85)
        at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because the return value of "org.apache.kafka.connect.sink.SinkRecord.valueSchema()" is null
        at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.isFlattened(SinkRecordDescriptor.java:321)
        at io.debezium.connector.jdbc.SinkRecordDescriptor$Builder.build(SinkRecordDescriptor.java:310)
        at io.debezium.connector.jdbc.JdbcChangeEventSink.buildRecordSinkDescriptor(JdbcChangeEventSink.java:197)
        ... 14 more

I have tried without below in Sink connecter config

value.converter: "org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enable: "false"
transforms: "unwrap"
transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"

but received the following error

Caused by: org.apache.kafka.connect.errors.DataException: 
JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. 
If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

Upvotes: 1

Views: 283

Answers (1)

Joseandro Luiz
Joseandro Luiz

Reputation: 9116

Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.kafka.connect.data.Schema.name()" because the return value of "org.apache.kafka.connect.sink.SinkRecord.valueSchema()" is null

I had the same issue in a similar setup. The issue was that the database schema was not matching Avro's. The kafka connector wasn't able to convert from Float32 to Decimal(2) on its own.

The exception message and logs were not helpful to narrow down the problem, so hopefully this will help others.

Upvotes: 0

Related Questions