Tomas F.
Tomas F.

Reputation: 720

Debezium fails to reconenct after lost connection to DB

I'm trying to achieve production-readiness of debezium CDC. One problem I came across is, that after a conenction loss (e.g. restart of source DB), it fails to reconnect:

WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
io.debezium.DebeziumException: Couldn't obtain encoding for database
  1. Could anyone recommend an out-of-the box solution for this?
  2. An idea was to create a docker image on top of official debezium, that would add a script periodically calling GET localhost:8083/connectors/XX/status -> body.tasks[0].state, and trigger POST localhost:8083/connectors/XX/restart, but calling /restart seems not to be working.

So far, I found a very old thread, which was not very helpful. Has anything changed since? Debezium source task fails to reconnect to postgresql DB when DB container is re-created

Thank you for any idea.

Environment:

        "name": "xx-connector",
        "config": {
            "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
            "database.hostname": "192.168.37.51",
            "plugin.name": "pgoutput",
            "database.port": "5432",
            "database.user": "debezium",
            "database.password": "debezium",
            "database.dbname": "xx",
            "schema.include.list": "xx",
            "table.include.list": "xx.x",
            "database.server.name": "xx-db3",
            "topic.prefix": "debezium.testdb",
            "snapshot.mode": "never",
            "topic.creation.enable": "true",
            "topic.creation.default.replication.factor": "1",
            "topic.creation.default.partitions": "1",
            "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
        }
    }
    "name": "inventory-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "172.17.0.5:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "172.17.0.5:8083",
            "trace": "io.debezium.DebeziumException: Couldn't obtain encoding for database xx\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:577)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:86)\n\tat io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:251)\n\tat io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:178)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:469)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:357)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.postgresql.util.PSQLException: Connection to 192.168.37.51:5432 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:342)\n\tat org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:54)\n\tat org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:263)\n\tat org.postgresql.Driver.makeConnection(Driver.java:443)\n\tat org.postgresql.Driver.connect(Driver.java:297)\n\tat io.debezium.jdbc.JdbcConnection.lambda$patternBasedFactory$1(JdbcConnection.java:243)\n\tat io.debezium.jdbc.JdbcConnection$ConnectionFactoryDecorator.connect(JdbcConnection.java:129)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:875)\n\tat io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:870)\n\tat io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:574)\n\t... 14 more\nCaused by: java.net.ConnectException: Connection refused (Connection refused)\n\tat java.base/java.net.PlainSocketImpl.socketConnect(Native Method)\n\tat java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)\n\tat java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)\n\tat java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)\n\tat java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)\n\tat java.base/java.net.Socket.connect(Socket.java:609)\n\tat org.postgresql.core.PGStream.createSocket(PGStream.java:243)\n\tat org.postgresql.core.PGStream.<init>(PGStream.java:98)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:132)\n\tat org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:258)\n\t... 23 more\n"
        }
    ],
    "type": "source"
}

Upvotes: 1

Views: 918

Answers (2)

piradian
piradian

Reputation: 444

To restart connector task try call:

POST /connectors/{connector-name}/tasks/{task-id}/restart

Upvotes: 0

Tomas F.
Tomas F.

Reputation: 720

Calling /restart does not work, but combination of /stop and /resume does.

Because I find this a bit tricky, question 1 is still relevant.

Here is the script:

#!/bin/ash

# Function to log messages
log_message() {
    echo "$(date '+%Y-%m-%d %H:%M:%S') $1"
}

# Log the start of the script
log_message "Checking liveliness of connectors..."

# Call GET host:8083/connectors/ to get the list of connectors
connectors=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/)

# Loop through each connector
for connector in $(echo "${connectors}" | jq -r '.[]'); do
    # Call GET host:8083/connectors/eachConnector/status
    status=$(curl -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/status)

    # Check if status is FAILED
    if [[ $(echo "${status}" | jq -r '.tasks[0].state') == "FAILED" ]]; then
        # Log that connector is restarting
        log_message "Restarting connector ${connector}..."

        # Call PUT host:8083/connectors/eachConnector/stop
        curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/stop > /dev/null

        # Call PUT host:8083/connectors/eachConnector/resume
        curl -X PUT -s http://${DEBEZIUM_CONTAINER_HOST}:8083/connectors/${connector}/resume > /dev/null
    else
        # Log that connector is OK
        log_message "Connector ${connector} is OK."
    fi
done

# Log the end of the script
log_message "Connector liveliness check completed."
FROM alpine:3.19.1

COPY scripts /scripts
RUN chmod +x /scripts/healthcheck.sh

RUN apk add curl
RUN apk add jq
RUN crontab -l | { cat; echo "* * * * *    /scripts/healthcheck.sh"; } | crontab -

CMD ["crond", "-f", "-d", "8"]

Upvotes: 0

Related Questions