Reputation: 89
In Kafka confluent, how can we use upsert using the source as CSV file while using pk.mode=record_key
for composite key in the MySQL table? The upsert mode is working while using the pk.mode=record_values
. Is there any additional configuration that needs to be done?
I am getting this error if I am trying with pk.mode=record_key
. Error - Caused by: org.apache.kafka.connect.errors.ConnectException
: Need exactly one PK column defined since the key schema for records is a primitive type.
Below is my JDBC sink connector configuration:
{
"name": "<name>",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "<topic name>",
"connection.url": "<url>",
"connection.user": "<user name>",
"connection.password": "*******",
"insert.mode": "upsert",
"batch.size": "50000",
"table.name.format": "<table name>",
"pk.mode": "record_key",
"pk.fields": "field1,field2",
"auto.create": "true",
"auto.evolve": "true",
"max.retries": "10",
"retry.backoff.ms": "3000",
"mode": "bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}
Upvotes: 3
Views: 13641
Reputation: 32100
You need to use pk.mode
of record.value
.
This means take field(s) from the value of the message and use them as the primary key in the target table and for UPSERT
purposes.
If you set record.key
it will try to take the key field(s) from the Kafka message key. Unless you've actually got the values in your message key, this is not the setting that you want to use.
These might help you further:
Upvotes: 11