pcb
pcb

Reputation: 63

Postgres Replication Slots Checking Lag

I'm attempting to detect on my AWS RDS Aurora Postgres 11.9 instance if my three Logical Replication slots are backing up. I'm using wal2json plugin to read off of them continuously. Two of the slots are being read off by python processes. The third is kafka-connect consumer.

I'm using the below query, but am getting odds results. It is saying two of my slots are several GB behind even in the middle of the night when we have very small load. Am I misinterpreting what the query is saying?

SELECT redo_lsn, slot_name,restart_lsn, 
    round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind 
    FROM pg_control_checkpoint(), pg_replication_slots;

Things I've checked:

  1. I've checked that the consumers are still running.
  2. I have also looked at the logs and the timestamps of the rows being inserted are coming off the database within 0-2 seconds after they were inserted. So it doesn't appear like I'm lagging behind.
  3. I've performed an end-to-end test and the data is making it through my pipeline in a few seconds, so it is definitely consuming data relatively fast.
  4. Both of the slots I'm using for my python processes have the same value for GB_behind, currently 12.40. Even though the two slots are on different logical databases which have dramatically different load (one is ~1000x higher load).
  5. I have a 3rd replication slot being read by a different program (kafka connect). It shows 0 GB_behind.

There is just no way, even at peak load, that my workloads could generate 12.4 GBs of data in a few seconds(not even in a few minutes). Am I miss interpreting something? Is there a better way to check how far a replication slot is behind?

Thanks much!

Here is a small snippet of my code(python3.6) in case it helps, but I've bene using it for awhile and data has been working:

def consume(msg):
    print(msg.payload)
    try:
        kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
    except:
        logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')

with con.cursor() as cur:
    cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
    cur.consume_stream(consume)

Upvotes: 2

Views: 4624

Answers (1)

pcb
pcb

Reputation: 63

I wasn't properly performing send_feedback during my consume function. So I was consuming the records, but I wasn't telling the Postgres replication slot that I had consumed the records.

Here is my complete consume function in case others interested:

def consume(msg):
    print(msg.payload)
    try:
        kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
    except:
        logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')

    msg.cursor.send_feedback(flush_lsn=msg.data_start)

with con.cursor() as cur:
    cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
    cur.consume_stream(consume)

Upvotes: 4

Related Questions