zombiehugs
zombiehugs

Reputation: 717

Postgres WAL Lag with Debezium CDC Source Connector, requires connector restart to fix

First of all thanks for all the hard work and support you guys provide. I'm reaching out as I've exhausted all my efforts to determine an issue I am encountering with Debezium Postgres CDC Source Connector (1.4.1) that is being host in Confluent Kafka Connect (6.0.x). The issue we are seeing is random times throughout the day the Connector will seemingly stall and the the Postgres WAL Lag will start to climb, the connector for the most part does not recover, a restart of the connector task is required.

We ran Java Flight Recorder on the Kafka Connect instance during a problem period and came out with this stack trace:

at java.sql.SQLException.setNextException(java.sql.SQLException)
at void java.sql.SQLWarning.setNextWarning(java.sql.SQLWarning) 
at void org.postgresql.core.QueryExecutorBase.addWarning(java.sql.SQLWarning) 
at org.postgresql.core.v3.CopyOperationImpl org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(org.postgresql.core.v3.CopyOperationImpl, boolean) 
at void org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(org.postgresql.core.v3.CopyOperationImpl, boolean) 
at byte[] org.postgresql.core.v3.CopyDualImpl.readFromCopy(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readPending() 
at boolean io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(io.debezium.connector.postgresql.connection.ReplicationStream$ReplicationMessageProcessor) 
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext, io.debezium.connector.postgresql.connection.ReplicationStream) 
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext) 
at void io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(io.debezium.pipeline.spi.OffsetContext, io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext) 
at void io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0() 
at void io.debezium.pipeline.ChangeEventSourceCoordinator$$Lambda$860.1129257263.run() 
at java.lang.Object java.util.concurrent.Executors$RunnableAdapter.call() 
at void java.util.concurrent.FutureTask.run() at void java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) 
at void java.util.concurrent.ThreadPoolExecutor$Worker.run() at void java.lang.Thread.run()

Looking through Github I can find the addWarning method call as described by the trace. It looks like it occurs due to an N char in the response. I've tried doing some googling to better understand what is occuring or what this means but am coming up empty handed.

Line in source where addWarning is called (I think) - https://github.com/pgjdbc/pgjdbc/blob/45a771fccd77c1211d8638111ee3e9934849b781/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java#L1217

The problem has existed for some time and we've just been dealing with it.

A few things we've tried:

No logs are output to stdout during this unrecoverable error, I don't have any additional information other than the stack trace above as I am not able to remotely debug. Would be curious to know if anyone else has encountered this issue or is intimately familiar with the Debezium Source.

Upvotes: 0

Views: 1807

Answers (1)

zombiehugs
zombiehugs

Reputation: 717

Resolved:

We resolved this by providing a filtering rule to slot.stream.params property within our connector configuration. By doing so we tuned the replication slot to filter out unnecessary tables and only send over the tables we required, this filtering is done server-side before sending them to our connector.

For more information about this configuration property you can see it here: https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-property-slot-stream-params

Example Config:

{
  "name": "fulfillment-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "database.hostname": "192.168.99.100", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "postgres", 
    "topic.prefix": "fulfillment", 
    "table.include.list": "public.inventory",
    "slot.stream.params": "add-tables=public.inventory" // This enabled the filtering server-side
  }
}

Upvotes: 0

Related Questions