AlfaRomeo
AlfaRomeo

Reputation: 35

Confluent Kafka Sink Connector is not loading data to Postgres table

I am trying to load data to Postgres table(s) through Kafka Sink connector but I am getting the following error:

Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='A_ABBREV', isPrimaryKey=false}, as it is not optional and does not have a default value

The table in Postgres DB already has the field A_ABBREV, but now sure why I am getting missing field error.

Did anyone face the similar kind of issue?

Below is my Sink Connector Configuration:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
table.name.format=AGENCY
connection.password=passcode
topics=AGENCIES
tasks.max=1
batch.size=10000
fields.whitelist=A_ID, A_NAME, A_ABBREV
connection.user=pmmdevuser
name=partner5-jdbcSinkConnector
connection.url=jdbc:postgresql://aws-db.sdfdgfdrwwisc.us-east- 1.rds.amazonaws.com:3306/pmmdevdb?currentSchema=ams
insert.mode=upsert
pk.mode=record_value
pk.fields=A_ID
auto.create=false

I am using Liquibase scripts to create tables and below is the create query from the postgres DB which got created through Liquibase scripts:

"CREATE TABLE gds.agency
(
    a_id integer NOT NULL,
    a_name character varying(100) COLLATE pg_catalog."default" NOT NULL,
    a_abbrev character varying(8) COLLATE pg_catalog."default" NOT NULL,
    source character varying(255) COLLATE pg_catalog."default" NOT NULL DEFAULT 'AMS'::character varying,
    CONSTRAINT pk_agency PRIMARY KEY (a_id),
    CONSTRAINT a_abbrev_uk1 UNIQUE (a_abbrev)
)"

Upvotes: 1

Views: 2454

Answers (1)

Tassia Paschoal
Tassia Paschoal

Reputation: 36

From my experience, this means that that the field definition for the sink does not match the field definition for the source table/database. Make sure the field definitions match. Inspect the individual record the sink connector is trying to write to your target db. You should be able to see this insert statement in debug mode in the stack trace. take that query and run it manually to get a clearer idea of the error from the database.

Upvotes: 2

Related Questions