Artem Moskvin
Artem Moskvin

Reputation: 1

Debezium JDBC sink connector is constantly losing Connection to PostgreSQL and i don't know why

There is an issue i have: JDBC sink Connector fails to send data into the DB (PostgreSQL) due to losing Connection. It goes with "SQL Error: 0, SQLState: 08006" - there is only information i got from these logs. Logs from DB does not say anything useful, even about connections.

The thing is, Connector works for a while, before it randomly loosing Connection. I can restart it, and it will be work for a while again, before it randomly closes again. Is there something important i missing?

Configuration: (all IPs, passwords and URLs was replaced like <connection IP>)

{
  "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
  "errors.log.include.messages": "true",
  "custom.metric.tags": "connector_name=sink-opr-db-lst-notifications-connector",
  "connection.password": "<password>",
  "tasks.max": "1",
  "transforms": "dropPrefix",
  "transforms.dropPrefix.regex": "([^.]+)\\.([^.]+)\\.([^.]+)\\.([^.]+)",
  "auto.evolve": "true",
  "schema.evolution": "basic",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "insert.mode": "upsert",
  "errors.log.enable": "true",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "hibernate.connection.driver_class": "org.postgresql.Driver",
  "errors.retry.timeout": "-1",
  "transforms.dropPrefix.replacement": "$4",
  "table.name.format": "ntf.${topic}",
  "primary.key.mode": "record_key",
  "database.time_zone": "UTC",
  "truncate.enabled": "true",
  "batch.size": "500",
  "connection.username": "debezium",
  "connection.attempts": "10",
  "consumer.max.poll.records": "500",
  "connection.backoff.ms": "10000",
  "topics.regex": "opr-db-lst\\.NOTIFICATIONS\\.dbo.+",
  "transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "key.converter.schemas.enable": "true",
  "delete.enabled": "true",
  "name": "sink-opr-db-lst-notifications-connector",
  "value.converter.schemas.enable": "true",
  "errors.tolerance": "none",
  "auto.create": "true",
  "connection.url": "jdbc:postgresql://<URL>.local:5000/odw"
}

Logs from Connector:

[2024-11-08 09:37:45,930] INFO [sink-opr-db-lst-notifications-connector|task-0] App info kafka.consumer for connector-consumer-sink-opr-db-lst-notifications-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter (org.apache.kafka.common.metrics.Metrics:688)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1102)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1055)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Member connector-consumer-sink-opr-db-lst-notifications-connector-0-6f59b7da-2fc1-44b4-9dd8-a6278430e1e0 sending LeaveGroup request to coordinator <confident IP> (id: 2147483644 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1163)
[2024-11-08 09:37:45,921] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Revoke previously assigned partitions <here was too many partitions> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:78)
[2024-11-08 09:37:45,913] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing the session factory (io.debezium.connector.jdbc.JdbcSinkConnectorTask:143)
[2024-11-08 09:37:45,912] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] Another error has occurred [ org.postgresql.util.PSQLException: This connection has been closed. ] which will not be reported to listeners! (com.mchange.v2.c3p0.impl.NewPooledConnection:223)
[2024-11-08 09:37:45,912] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] A PooledConnection that has already signalled a Connection error is still in use! (com.mchange.v2.c3p0.impl.NewPooledConnection:220)
[2024-11-08 09:37:45,910] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] Another error has occurred [ org.postgresql.util.PSQLException: This connection has been closed. ] which will not be reported to listeners! (com.mchange.v2.c3p0.impl.NewPooledConnection:223)
[2024-11-08 09:37:45,910] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] A PooledConnection that has already signalled a Connection error is still in use! (com.mchange.v2.c3p0.impl.NewPooledConnection:220)
[2024-11-08 09:37:45,910] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing session. (io.debezium.connector.jdbc.JdbcChangeEventSink:247)
[2024-11-08 09:37:45,909] ERROR [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
[2024-11-08 09:37:45,904] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS-5/OffsetAndMetadata{offset=8459991, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-3/OffsetAndMetadata{offset=25765463, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-1/OffsetAndMetadata{offset=45740880, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS-4/OffsetAndMetadata{offset=8455456, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-4/OffsetAndMetadata{offset=25772499, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-0/OffsetAndMetadata{offset=45728813, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,898] ERROR [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
[2024-11-08 09:37:45,898] ERROR [sink-opr-db-lst-notifications-connector|task-0] JDBC sink connector failure (io.debezium.connector.jdbc.JdbcSinkConnectorTask:95)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS offset to 8455456. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS offset to 8459991. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 25772499. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 25765463. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 45740880. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 45728813. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,895] ERROR [sink-opr-db-lst-notifications-connector|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask:112)
[2024-11-08 09:37:45,895] ERROR [sink-opr-db-lst-notifications-connector|task-0] An I/O error occurred while sending to the backend. (org.hibernate.engine.jdbc.spi.SqlExceptionHelper:150)
[2024-11-08 09:37:45,894] WARN [sink-opr-db-lst-notifications-connector|task-0] SQL Error: 0, SQLState: 08006 (org.hibernate.engine.jdbc.spi.SqlExceptionHelper:145)

Stacktrace:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
    at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
    ... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
    at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:229)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:207)
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:207)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:159)
    at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
    ... 12 more
Caused by: org.hibernate.exception.JDBCConnectionException: error executing work [An I/O error occurred while sending to the backend.] [n/a]
    at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:100)
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:58)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:108)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:94)
    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:308)
    at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1053)
    at org.hibernate.internal.AbstractSharedSessionContract.doReturningWork(AbstractSharedSessionContract.java:1049)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.hasTable(JdbcChangeEventSink.java:287)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:256)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:218)
    ... 17 more
Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:395)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
    at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
    at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
    at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
    at org.postgresql.jdbc.PgDatabaseMetaData.getTables(PgDatabaseMetaData.java:1347)
    at com.mchange.v2.c3p0.impl.NewProxyDatabaseMetaData.getTables(NewProxyDatabaseMetaData.java:2938)
    at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.tableExists(GeneralDatabaseDialect.java:151)
    at io.debezium.connector.jdbc.dialect.postgres.PostgresDatabaseDialect.tableExists(PostgresDatabaseDialect.java:69)
    at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$hasTable$3(JdbcChangeEventSink.java:287)
    at org.hibernate.jdbc.WorkExecutor.executeReturningWork(WorkExecutor.java:58)
    at org.hibernate.internal.AbstractSharedSessionContract.lambda$doReturningWork$5(AbstractSharedSessionContract.java:1049)
    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
    ... 22 more
Caused by: java.io.EOFException
    at org.postgresql.core.PGStream.receiveChar(PGStream.java:467)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2155)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
    ... 36 more

For now i see for myself, that problem might be exactly between Connector and DB, but, i want be sure, is there something i missing? What potentially can cause this problem, and where can i search for it more?

Upvotes: 0

Views: 282

Answers (0)

Related Questions