Reputation: 63
I'm attempting to detect on my AWS RDS Aurora Postgres 11.9 instance if my three Logical Replication slots are backing up. I'm using wal2json plugin to read off of them continuously. Two of the slots are being read off by python processes. The third is kafka-connect consumer.
I'm using the below query, but am getting odds results. It is saying two of my slots are several GB behind even in the middle of the night when we have very small load. Am I misinterpreting what the query is saying?
SELECT redo_lsn, slot_name,restart_lsn,
round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind
FROM pg_control_checkpoint(), pg_replication_slots;
Things I've checked:
GB_behind
, currently 12.40
. Even though the two slots are on different logical databases which have dramatically different load (one is ~1000x higher load).0
GB_behind.There is just no way, even at peak load, that my workloads could generate 12.4 GBs of data in a few seconds(not even in a few minutes). Am I miss interpreting something? Is there a better way to check how far a replication slot is behind?
Thanks much!
Here is a small snippet of my code(python3.6) in case it helps, but I've bene using it for awhile and data has been working:
def consume(msg):
print(msg.payload)
try:
kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
except:
logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')
with con.cursor() as cur:
cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
cur.consume_stream(consume)
Upvotes: 2
Views: 4624
Reputation: 63
I wasn't properly performing send_feedback
during my consume function. So I was consuming the records, but I wasn't telling the Postgres replication slot that I had consumed the records.
Here is my complete consume function in case others interested:
def consume(msg):
print(msg.payload)
try:
kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
except:
logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')
msg.cursor.send_feedback(flush_lsn=msg.data_start)
with con.cursor() as cur:
cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
cur.consume_stream(consume)
Upvotes: 4