Sam Shiles
Sam Shiles

Reputation: 11249

Kafka Connect JDBC Connector - Exiting WorkerSinkTask due to unrecoverable exception

I am using the JDBC sink connector and have a bad message in the topic. I know why the message is bad (it is failing due to a FK constraint violation because of a issue with a producer). The error being reported by the worker task is:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))\n
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)\n\t... 10 more\nCaused by: java.sql.SQLException: java.sql.BatchUpdateException: 
Cannot add or update a child row: a foreign key constraint fails
(`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` 
(`id`))\ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolation
Exception: Cannot add or update a child row: a foreign key constraint
fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY
(`sensorId`) REFERENCES `sensor` (`id`))

What I want to happen is for this bad message to be skipped. So I have tried setting "errors.tolerance": "all". The full config for the sink connector is as follows:

{
    "name": "reading-sink2",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 4,
        "topics": "READING_MYSQL",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "connection.url": "jdbc:mysql://localhost:3306/sensorium?user=app&password=tQpRMCzHlAeu6kQIBk4U",
        "auto.create": true,
        "table.name.format": "reading",
        "errors.tolerance": "all"
    }
}

But the same error is being logged, the message is not being skipped and subsequent messages are not being processed.

Why is errors.tolerance: all not working as expected?

Upvotes: 2

Views: 9125

Answers (2)

Robin Moffatt
Robin Moffatt

Reputation: 32090

You can manually skip bad records, using the kafka-consumer-groups tool:

kafka-consumer-groups \
    --bootstrap-server kafka:29092 \
    --group connect-sink_postgres_foo_00 \
    --reset-offsets \
    --topic foo \
    --to-offset 2 \
    --execute

For more info see here.

I've logged an improvement suggestion for the sink, feel free to upvote: https://github.com/confluentinc/kafka-connect-jdbc/issues/721

Upvotes: 1

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

errors.tolerance property refers to errors that occur during converting (message conversion to/from Kafka Connect schema) or transforming messages (applying Single Message Transformation).

You can't skip/swallow exceptions, that are thrown during SinkTask::put(Collection<SinkRecord> records) or SourceTask::poll()

In you case exception is thrown in SinkTask::put(...)

io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)

Question regarding similar issues:

You can read more about that in the following blog at confluent page: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

Upvotes: 4

Related Questions