Reputation: 1
There is an issue i have: JDBC sink Connector fails to send data into the DB (PostgreSQL) due to losing Connection. It goes with "SQL Error: 0, SQLState: 08006" - there is only information i got from these logs. Logs from DB does not say anything useful, even about connections.
The thing is, Connector works for a while, before it randomly loosing Connection. I can restart it, and it will be work for a while again, before it randomly closes again. Is there something important i missing?
Configuration: (all IPs, passwords and URLs was replaced like <connection IP>)
{
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"errors.log.include.messages": "true",
"custom.metric.tags": "connector_name=sink-opr-db-lst-notifications-connector",
"connection.password": "<password>",
"tasks.max": "1",
"transforms": "dropPrefix",
"transforms.dropPrefix.regex": "([^.]+)\\.([^.]+)\\.([^.]+)\\.([^.]+)",
"auto.evolve": "true",
"schema.evolution": "basic",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"insert.mode": "upsert",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"hibernate.connection.driver_class": "org.postgresql.Driver",
"errors.retry.timeout": "-1",
"transforms.dropPrefix.replacement": "$4",
"table.name.format": "ntf.${topic}",
"primary.key.mode": "record_key",
"database.time_zone": "UTC",
"truncate.enabled": "true",
"batch.size": "500",
"connection.username": "debezium",
"connection.attempts": "10",
"consumer.max.poll.records": "500",
"connection.backoff.ms": "10000",
"topics.regex": "opr-db-lst\\.NOTIFICATIONS\\.dbo.+",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"key.converter.schemas.enable": "true",
"delete.enabled": "true",
"name": "sink-opr-db-lst-notifications-connector",
"value.converter.schemas.enable": "true",
"errors.tolerance": "none",
"auto.create": "true",
"connection.url": "jdbc:postgresql://<URL>.local:5000/odw"
}
Logs from Connector:
[2024-11-08 09:37:45,930] INFO [sink-opr-db-lst-notifications-connector|task-0] App info kafka.consumer for connector-consumer-sink-opr-db-lst-notifications-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:88)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:694)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing reporter org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter (org.apache.kafka.common.metrics.Metrics:688)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:688)
[2024-11-08 09:37:45,927] INFO [sink-opr-db-lst-notifications-connector|task-0] Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:684)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1102)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Resetting generation and member id due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1055)
[2024-11-08 09:37:45,922] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Member connector-consumer-sink-opr-db-lst-notifications-connector-0-6f59b7da-2fc1-44b4-9dd8-a6278430e1e0 sending LeaveGroup request to coordinator <confident IP> (id: 2147483644 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1163)
[2024-11-08 09:37:45,921] INFO [sink-opr-db-lst-notifications-connector|task-0] [Consumer clientId=connector-consumer-sink-opr-db-lst-notifications-connector-0, groupId=connect-sink-opr-db-lst-notifications-connector] Revoke previously assigned partitions <here was too many partitions> (org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker:78)
[2024-11-08 09:37:45,913] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing the session factory (io.debezium.connector.jdbc.JdbcSinkConnectorTask:143)
[2024-11-08 09:37:45,912] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] Another error has occurred [ org.postgresql.util.PSQLException: This connection has been closed. ] which will not be reported to listeners! (com.mchange.v2.c3p0.impl.NewPooledConnection:223)
[2024-11-08 09:37:45,912] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] A PooledConnection that has already signalled a Connection error is still in use! (com.mchange.v2.c3p0.impl.NewPooledConnection:220)
[2024-11-08 09:37:45,910] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] Another error has occurred [ org.postgresql.util.PSQLException: This connection has been closed. ] which will not be reported to listeners! (com.mchange.v2.c3p0.impl.NewPooledConnection:223)
[2024-11-08 09:37:45,910] WARN [sink-opr-db-lst-notifications-connector|task-0] [c3p0] A PooledConnection that has already signalled a Connection error is still in use! (com.mchange.v2.c3p0.impl.NewPooledConnection:220)
[2024-11-08 09:37:45,910] INFO [sink-opr-db-lst-notifications-connector|task-0] Closing session. (io.debezium.connector.jdbc.JdbcChangeEventSink:247)
[2024-11-08 09:37:45,909] ERROR [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:212)
[2024-11-08 09:37:45,904] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS-5/OffsetAndMetadata{offset=8459991, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-3/OffsetAndMetadata{offset=25765463, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-1/OffsetAndMetadata{offset=45740880, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS-4/OffsetAndMetadata{offset=8455456, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-4/OffsetAndMetadata{offset=25772499, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,903] WARN [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Ignoring invalid task provided offset NOTIFICATIONS_DTL-0/OffsetAndMetadata{offset=45728813, leaderEpoch=null, metadata=''} -- partition not assigned, assignment=[<here was too many partitions>] (org.apache.kafka.connect.runtime.WorkerSinkTask:467)
[2024-11-08 09:37:45,898] ERROR [sink-opr-db-lst-notifications-connector|task-0] WorkerSinkTask{id=sink-opr-db-lst-notifications-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure (org.apache.kafka.connect.runtime.WorkerSinkTask:630)
[2024-11-08 09:37:45,898] ERROR [sink-opr-db-lst-notifications-connector|task-0] JDBC sink connector failure (io.debezium.connector.jdbc.JdbcSinkConnectorTask:95)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS offset to 8455456. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS offset to 8459991. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 25772499. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 25765463. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 45740880. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,896] DEBUG [sink-opr-db-lst-notifications-connector|task-0] Rewinding topic NOTIFICATIONS_DTL offset to 45728813. (io.debezium.connector.jdbc.JdbcSinkConnectorTask:205)
[2024-11-08 09:37:45,895] ERROR [sink-opr-db-lst-notifications-connector|task-0] Failed to process record: Failed to process a sink record (io.debezium.connector.jdbc.JdbcSinkConnectorTask:112)
[2024-11-08 09:37:45,895] ERROR [sink-opr-db-lst-notifications-connector|task-0] An I/O error occurred while sending to the backend. (org.hibernate.engine.jdbc.spi.SqlExceptionHelper:150)
[2024-11-08 09:37:45,894] WARN [sink-opr-db-lst-notifications-connector|task-0] SQL Error: 0, SQLState: 08006 (org.hibernate.engine.jdbc.spi.SqlExceptionHelper:145)
Stacktrace:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
... 11 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:229)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:207)
at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:207)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:159)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
... 12 more
Caused by: org.hibernate.exception.JDBCConnectionException: error executing work [An I/O error occurred while sending to the backend.] [n/a]
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:100)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:58)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:108)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:94)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:308)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:1053)
at org.hibernate.internal.AbstractSharedSessionContract.doReturningWork(AbstractSharedSessionContract.java:1049)
at io.debezium.connector.jdbc.JdbcChangeEventSink.hasTable(JdbcChangeEventSink.java:287)
at io.debezium.connector.jdbc.JdbcChangeEventSink.checkAndApplyTableChangesIfNeeded(JdbcChangeEventSink.java:256)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:218)
... 17 more
Caused by: org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:395)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:335)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:321)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:297)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:246)
at org.postgresql.jdbc.PgDatabaseMetaData.getTables(PgDatabaseMetaData.java:1347)
at com.mchange.v2.c3p0.impl.NewProxyDatabaseMetaData.getTables(NewProxyDatabaseMetaData.java:2938)
at io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect.tableExists(GeneralDatabaseDialect.java:151)
at io.debezium.connector.jdbc.dialect.postgres.PostgresDatabaseDialect.tableExists(PostgresDatabaseDialect.java:69)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$hasTable$3(JdbcChangeEventSink.java:287)
at org.hibernate.jdbc.WorkExecutor.executeReturningWork(WorkExecutor.java:58)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doReturningWork$5(AbstractSharedSessionContract.java:1049)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:303)
... 22 more
Caused by: java.io.EOFException
at org.postgresql.core.PGStream.receiveChar(PGStream.java:467)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2155)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
... 36 more
For now i see for myself, that problem might be exactly between Connector and DB, but, i want be sure, is there something i missing? What potentially can cause this problem, and where can i search for it more?
Upvotes: 0
Views: 282