Kishore Bandi
Kishore Bandi

Reputation: 5721

Resume Debezium MySQL connector after unexpected AWS RDS binlogs deletion

When Debezium is running as a source in kafka connect and if no updates happens for a while on that destination MySQL DB (Amazon RDS Instance), then after sometime I end up below error.

[2018-04-25 21:30:14,526] INFO Step 0: Get all known binlogs from MySQL (io.debezium.connector.mysql.MySqlConnectorTask:310)
[2018-04-25 21:30:14,536] INFO Connector requires binlog file 'mysql-bin-changelog.002640', but MySQL only has mysql-bin-changelog.002663, mysql-bin-changelog.002664, mysql-bin-changelog.002665 (io.debezium.connector.mysql.MySqlConnectorTask:323)
[2018-04-25 21:30:14,536] INFO MySQL has the binlog file 'mysql-bin-changelog.002640' required by the connector (io.debezium.connector.mysql.MySqlConnectorTask:325)
[2018-04-25 21:30:14,536] INFO Stopping MySQL connector task (io.debezium.connector.mysql.MySqlConnectorTask:239)
[2018-04-25 21:30:14,536] INFO WorkerSourceTask{id=swiggy-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-04-25 21:30:14,536] INFO WorkerSourceTask{id=swiggy-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-04-25 21:30:14,536] ERROR WorkerSourceTask{id=swiggy-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at binlog file 'mysql-bin-changelog.002640', pos=470, skipping 4 events plus 0 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
    at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:117)
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:45)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

When I go to DB and check the binlogs in MySQL

mysql> show binary logs;
+----------------------------+-----------+
| Log_name                   | File_size |
+----------------------------+-----------+
| mysql-bin-changelog.002664 |       479 |
| mysql-bin-changelog.002665 |       120 |
+----------------------------+-----------+


mysql> show binlog events;
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+
| Log_name                   | Pos | Event_type  | Server_id  | End_log_pos | Info                                                                                                                            |
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+
| mysql-bin-changelog.002664 |   4 | Format_desc | 1550192458 |         120 | Server ver: 5.6.39-log, Binlog ver: 4                                                                                           |
| mysql-bin-changelog.002664 | 120 | Query       | 1550192458 |         201 | BEGIN                                                                                                                           |
| mysql-bin-changelog.002664 | 201 | Query       | 1550192458 |         391 | use `mysql`; INSERT INTO mysql.rds_heartbeat2(id, value) values (1,1524671965007) ON DUPLICATE KEY UPDATE value = 1524671965007 |
| mysql-bin-changelog.002664 | 391 | Xid         | 1550192458 |         422 | COMMIT /* xid=308462 */                                                                                                         |
| mysql-bin-changelog.002664 | 422 | Rotate      | 1550192458 |         479 | mysql-bin-changelog.002665;pos=4                                                                                                |
+----------------------------+-----+-------------+------------+-------------+---------------------------------------------------------------------------------------------------------------------------------+

Question:

  1. Why was Debezium idle? Why didn't it read the files from MySQL after 002640 file?
    This was not in use by any service. So there couldn't have been a scenario where there were too many writes happening before Debezium could read it.
  2. Why did Amazon MySQL RDS delete the binlog files, when there was no activity happening?
    This is a test database and only I insert records in it. So there was no outside application activity happening here.
  3. Is there a way to Resume Debezium connector and start processing records from the time log are currently available at MySQL? (If I'm okay with those unread records being lost).
    I tried restarting the job, deleting and adding the connector but I always end up with the same error.
    The only solution that worked for resuming the event
    • Delete the Kafka Connect's offet topic.
    • Delete & Add the debezium connector again.
      I wanted a different approach as in production we'll have a whole lot of connectors which will be using the same offset topic. So deletion would not be possible.

Upvotes: 5

Views: 6865

Answers (1)

Jiri Pechanec
Jiri Pechanec

Reputation: 1976

please look at heartbeat.interval.ms config property - this should prevent the situations when you have a low traffic table monitored by Debezium in a high-traffic environment. Insuch case it can happen that binlog is flushed but the current binlog coordinates are not recorded in the offsets topic.

Regarding the resume - you can recover from the situation by modifying offsets topic. here you would need to insert an offset record for the plug-in and binlog coordinates that are available on the server. There is a Kafka KIP for a tool that would help with this. Now you need to do it manually.

Upvotes: 2

Related Questions