Sanjay Nayak
Sanjay Nayak

Reputation: 89

Kafka-confluent: How to use pk.mode=record_key for upsert and delete mode in JDBC sink connector?

In Kafka confluent, how can we use upsert using the source as CSV file while using pk.mode=record_key for composite key in the MySQL table? The upsert mode is working while using the pk.mode=record_values. Is there any additional configuration that needs to be done?

I am getting this error if I am trying with pk.mode=record_key. Error - Caused by: org.apache.kafka.connect.errors.ConnectException: Need exactly one PK column defined since the key schema for records is a primitive type. Below is my JDBC sink connector configuration:

    {
    "name": "<name>",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "<topic name>",
    "connection.url": "<url>",
    "connection.user": "<user name>",
    "connection.password": "*******",
    "insert.mode": "upsert",
    "batch.size": "50000",
    "table.name.format": "<table name>",
    "pk.mode": "record_key",
    "pk.fields": "field1,field2",
    "auto.create": "true",
    "auto.evolve": "true",
    "max.retries": "10",
    "retry.backoff.ms": "3000",
    "mode": "bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://localhost:8081"
  }
}

Upvotes: 3

Views: 13641

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32100

You need to use pk.mode of record.value. This means take field(s) from the value of the message and use them as the primary key in the target table and for UPSERT purposes.

If you set record.key it will try to take the key field(s) from the Kafka message key. Unless you've actually got the values in your message key, this is not the setting that you want to use.

These might help you further:

Upvotes: 11

Related Questions