vigneshwar
vigneshwar

Reputation: 161

Importing json data into postgres using kafka jdbc sink connector

I'm new to kafka. My requirement is to load json data from kafka topic to postgres table.

Below is the table structure

CREATE TABLE dup_emp (
    emp_id integer PRIMARY KEY,
    emp_name text,
    emp_salary integer 
);

Was producing messages in json format on to topic manually (i.e., from producer console). Example, {"emp_id":1,"emp_name":"bheem","emp_salary":2000}

And my connector configuration as follows:-

curl -X PUT http://localhost:8083/connectors/load_test/config \
-H "Content-Type: application/json" \
-d '{
 "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
 "connection.url":"jdbc:postgresql://localhost:5432/somedb",
 "connection.user":"user",
 "connection.password":"passwd",
 "key.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable":"false",
 "value.converter.schemas.enable":"false",
 "tasks.max" : "1",
 "topics":"dup_emp",
 "table.name.format":"dup_emp",
 "insert.mode":"insert",
 "quote.sql.identifiers":"never"
}'

And was ending up with the following error:-

Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'load_test' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='dup_emp',partition=0,offset=0,timestamp=1633066307312) with a HashMap value and null value schema.

As per my knowledge, when we use source connector, it will store schema in schema registry and used by sink connector for validation. But, now as i'm not using source connector, as I was producing data in json format manually, how can I load data into postgres.

Can someone, help me out in understanding the error in better way and suggest me a proper configuration to load data into postgres.

Upvotes: 2

Views: 937

Answers (1)

esefe sgrgrz
esefe sgrgrz

Reputation: 13

I have a sink postgres connector and I want to test is. I create a table "test" on postgres database :

create table test (id serial primary key, value text)

on broker's server

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap localhost:9092 -- topic postgresql.public.test
{"id":1,value:"aaa"}

but I get an error :

org.apache.kafka.connect.errors.Data.Exception: JsonConverter with schemes.enable requires schema and payload fields and may not contain additinal fields.

How I can make a register on consumer to test it

Upvotes: 0

Related Questions