Reputation: 1
I'm using Kafka Connect with the Debezium PostgreSQL connector to stream data changes, and I'm encountering an error that causes the task to fail and not recover automatically. The error seems related to a prepared statement conflict in PostgreSQL. Below is the relevant portion of the log:
2024-10-08 12:04:39 2024-10-08 05:04:39,928 ERROR || WorkerSourceTask{id=postgres-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
2024-10-08 12:04:39 org.apache.kafka.connect.errors.ConnectException: Database connection failed during resolving unknown type
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.resolveUnknownType(TypeRegistry.java:404)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.get(TypeRegistry.java:179)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.PostgresType$Builder.build(PostgresType.java:304)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.prime(TypeRegistry.java:338)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.<init>(TypeRegistry.java:125)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.connection.PostgresConnection.<init>(PostgresConnection.java:106)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.PostgresConnectorTask.lambda$start$1(PostgresConnectorTask.java:95)
2024-10-08 12:04:39 at io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory.<init>(DefaultMainConnectionProvidingConnectionFactory.java:16)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:94)
2024-10-08 12:04:39 at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:141)
2024-10-08 12:04:39 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:280)
2024-10-08 12:04:39 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
2024-10-08 12:04:39 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
2024-10-08 12:04:39 at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
2024-10-08 12:04:39 at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
2024-10-08 12:04:39 at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2024-10-08 12:04:39 at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2024-10-08 12:04:39 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2024-10-08 12:04:39 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2024-10-08 12:04:39 at java.base/java.lang.Thread.run(Thread.java:829)
2024-10-08 12:04:39 Caused by: org.postgresql.util.PSQLException: ERROR: prepared statement "S_1" already exists
2024-10-08 12:04:39 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
2024-10-08 12:04:39 at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
2024-10-08 12:04:39 at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
2024-10-08 12:04:39 at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
2024-10-08 12:04:39 at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
2024-10-08 12:04:39 at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
2024-10-08 12:04:39 at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.loadType(TypeRegistry.java:409)
2024-10-08 12:04:39 at io.debezium.connector.postgresql.TypeRegistry.resolveUnknownType(TypeRegistry.java:400)
2024-10-08 12:04:39 ... 19 more
my docker compose file:
version: '2'
services:
zookeeper:
image: quay.io/debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: quay.io/debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
links:
- kafka
ports:
- 9089:8080
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
postgres:
image: quay.io/debezium/example-postgres:${DEBEZIUM_VERSION}
ports:
- 6543:5432
environment:
- POSTGRES_USER=***
- POSTGRES_PASSWORD=***
connect:
image: quay.io/debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
links:
- kafka
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
my connect.json file:
{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"decimal.handling.mode": "string",
"tasks.max": "1",
"database.hostname": "***",
"database.port": "***",
"database.user": "***",
"database.password": "****",
"database.dbname": "postgres",
"topic.prefix": "dbserver1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"snapshot.mode": "always",
"table.include.list": "public.clinics",
"database.slot.name": "debezium_slot"
}
}
Has anyone encountered this error before or knows what might cause Debezium to conflict with prepared statements in PostgreSQL?
Checked for any cached prepared statements in PostgreSQL, but no long-running statements were found. Tried restarting the connector, but the issue reoccurs consistently. Searched for potential issues with PostgreSQL’s type resolution, but I haven’t found a clear answer yet.
Upvotes: 0
Views: 57