Serge
Serge

Reputation: 53

Unexpected deadlocks in Postgresql (while using psycopg2)


I am dealing with a deadlock issue in PostgreSQL that I do not understand.
I am trying to implement a Round Robin-like algorithm using Python, psycopg2 module, and a Postgres database.
I want several instances of an application to do the following:
- Lock the whole table with a list of tasks for a very short interval
- Pick a task to perform (least recently performed task, with some limitations)
- Label task so other instances do not pick it (only one instance is allowed to perform the same task at the same time)
- Unlock table
- Perform task
- Repeat
Other sessions should also be able to update certain fields of this table.
Suddenly, I am getting deadlocks that I cannot explain. I have simplified my Python script as hard as I could, I am performing a Commit after every statement (when possible), but still a deadlock appears every now and then.
For some reason, every time I get a deadlock, it is the first statement in a transaction. How is it even possible? My table doesn’t have any triggers, or foreign key constraints, or anything that would make things complicated. The only explanation I can come up with is that PostgreSQL does not release the lock immediately after the commit. Or perhaps it is psycopg2 that is not working the way I expect it to? I have failed to reproduce the issue by manually running statements in different sessions.
Deadlocks are rare, but I do get them at least once every few hours

I am running on PostgreSQL 9.6.1 and Python 2.7.12

Here is the code I run (this is just a simplified sample I made in order to catch the issue):

import psycopg2
import sys
import datetime
import time
sys.path.append('/opt/workflow/lib')
import config
import ovs_lib


instance_type='scan_master'
instance_id=sys.argv[1]

dbh=psycopg2.connect(dbname=config.values['pgsql']['db'], host=config.values['pgsql']['host'], port=int(config.values['pgsql']['port']), user=config.values['pgsql']['user'], password=config.values['pgsql']['pass'])
dbh.set_session(isolation_level='READ COMMITTED', autocommit=False)
cursor = dbh.cursor()
cursor.execute("SET search_path TO "+config.values['pgsql']['schema'])

def sanitize(string):
  string=string.replace("'","''")
  return string

def get_task(instance_id):
  task_id=None
  out_struct={}
  instance_id=sanitize(instance_id)
  #Lock whole table
  dbh.commit() #Just in case
  cursor.execute("SELECT 1 FROM wf_task FOR UPDATE") #Lock the table
  cursor.execute("UPDATE wf_task SET scanner_instance_id=null WHERE scanner_instance_id='"+instance_id+"'") #release task from previous run
  #Now get the task
  sql ="SELECT t.task_id, st.scanner_function, t.parallel_runs\n"
  sql+="FROM wf_task t\n"
  sql+="JOIN wf_scanner_type st ON t.scanner_type_id=st.scanner_type_id\n"
  sql+="WHERE status='A'\n"
  sql+="AND t.scanner_instance_id is NULL\n"
  sql+="AND last_scan_ts<=now()-scan_interval*interval '1 second'\n"
  sql+="ORDER BY last_scan_ts\n"
  sql+="LIMIT 1\n"
  cursor.execute(sql)
  cnt=cursor.rowcount
  if cnt>0:
    row=cursor.fetchone()
    task_id=row[0]
    sql ="UPDATE wf_task SET scanner_instance_id='"+instance_id+"',last_scan_ts=current_timestamp(3) WHERE task_id="+str(task_id)
    cursor.execute(sql)
    scanner_function=row[1]
    parallel_runs=row[2]
    out_struct['task_id']=task_id
    out_struct['scanner_function']=scanner_function
    out_struct['parallel_runs']=parallel_runs
  dbh.commit()
  return out_struct

def process_task(task_id):
  sql="UPDATE wf_task SET submitted_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()
  sql="UPDATE wf_task SET executed_ts=now() WHERE task_id="+str(task_id)+" AND submitted_ts<now()"
  cursor.execute(sql)
  dbh.commit()

while True:
  if not ovs_lib.check_control(instance_type, instance_id):
    now_time=datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S')
    print now_time+" Stop sygnal received"
    exit(0)
  task_struct=get_task(instance_id)
  if 'task_id' not in task_struct:
    time.sleep(1)
    continue
  process_task(task_struct['task_id'])

And here are examples of the error I get:

Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 70, in <module>
process_task(task_struct['task_id'])
  File "/opt/workflow/bin/scan_simple.py", line 58, in process_task
cursor.execute(sql)
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21577 waits for ShareLock on transaction 39243027; blocked by process 21425.
Process 21425 waits for ShareLock on transaction 39243029; blocked by process 21102.
Process 21102 waits for AccessExclusiveLock on tuple (8,12) of relation 39933 of database 16390; blocked by process 21577.
HINT:  See server log for query details.
CONTEXT:  while updating tuple (8,12) in relation "wf_task"

Traceback (most recent call last):
  File "/opt/workflow/bin/scan_simple.py", line 66, in <module>
    task_struct=get_task(instance_id)
  File "/opt/workflow/bin/scan_simple.py", line 27, in get_task
    cursor.execute("SELECT 1 FROM wf_task FOR UPDATE")
psycopg2.extensions.TransactionRollbackError: deadlock detected
DETAIL:  Process 21776 waits for ShareLock on transaction 39488839; blocked by process 21931.
Process 21931 waits for ShareLock on transaction 39488844; blocked by process 21776.
HINT:  See server log for query details.
CONTEXT:  while locking tuple (17,9) in relation “wf_task"

At that time I had 6 instances of this script running simultaneously No other sessions were active in the database.

Later update
Today I learned something new about Postgres that its very relevant to this question
Starting version 9.5, PostgreSQL supports a SKIP LOCKED statement, that solves the problem that I was trying to design my application around, and in a very elegant manner
If you are struggling with concurrency in PostgreSQL while trying to implement some sort of queue or round robin solution, you absolutely must read this:
https://blog.2ndquadrant.com/what-is-select-skip-locked-for-in-postgresql-9-5/

Upvotes: 5

Views: 14564

Answers (1)

Laurenz Albe
Laurenz Albe

Reputation: 247585

The problem is probably that the sequential scan in the first SELECT ... FOR UPDATE doesn't always return the rows in the same order, so concurrent executions of this statement lock the rows of the table in different orders. This leads to the deadlock you experience.

There are several solutions, in increasing goodness:

  • I think that the technique to lock the whole table for this update is horrible for performance, but if you insist on keeping your code, you could set synchronize_seqscans to off so that all sequential scans return the rows in the same order. But you really shouldn't lock all rows in a table like you do, because

    • It causes an unnecessary sequential scan.

    • It is not safe. Somebody could INSERT new rows between the time where you lock the rows and the time where you run your UPDATEs.

  • If you really want to lock the whole table, use the LOCK TABLE statement instead of locking all rows in the table. That will get rid of the deadlock as well.

  • The best solution is probably to lock the rows with the UPDATE itself. To avoid deadlocks, examine the execution plans that PostgreSQL uses for the UPDATE. This will be an index scan or a sequential scan. With an index scan you are safe, because that will return the rows in a certain order. For a sequential scan, disable the synchronize_seqscans feature mentioned above, ideally only for the transaction:

    START TRANSACTION;
    SET LOCAL synchronize_seqscans = off;
    /* your UPDATEs go here */
    COMMIT;
    

Upvotes: 3

Related Questions