Reputation: 1024
Is there a way to run the django tests using multiple threads and force the race condition? I want to make sure that the code path that handles the transaction errors is executed. To be slightly more specific, I want to be able to spawn 2 threads that will try to perform the same action on the database, with one of them succeeding and the other one failing. I'm using the test framework that is in django.
Python pseudocode:
def some_method():
try
with transaction.atomic():
objectA = get_object_from_db()
objectA.delete()
except Error:
# error handling code to be run
class TestClass(TransactionalTestCase):
def test_some_method():
# run two threads and make sure that the race condition was present and some_method recovered successfully
Upvotes: 9
Views: 2692
Reputation: 1
Race condition causes 2 anomalies lost update or write skew. So with 2 threads, you can test if lost update or write skew occurs in some isolation levels.
I created 2 sets of code to test lost update or write skew with the default isolation level READ COMMITTED on PostgreSQL as shown below:
I explain about:
First, I created store_product
table with id
, name
and stock
with models.py
as shown below:
store_product
table:id | name | stock |
---|---|---|
1 | Apple | 10 |
2 | Orange | 20 |
# "store/models.py"
from django.db import models
class Product(models.Model):
name = models.CharField(max_length=30)
stock = models.IntegerField()
Then, I created and ran the test code of lost update as shown below:
# "store/views.py"
from django.db import transaction
from time import sleep
from .models import Person
from threading import Thread
from django.http import HttpResponse
@transaction.atomic
def transaction1(flow):
while True:
while True:
if flow[0] == "Step 1":
sleep(0.1)
print("T1", flow[0], "BEGIN")
flow[0] = "Step 2"
break
while True:
if flow[0] == "Step 2":
sleep(0.1)
print("T1", flow[0], "SELECT")
product = Product.objects.get(id=2)
print(product.id, product.name, product.stock)
flow[0] = "Step 3"
break
while True:
if flow[0] == "Step 5":
sleep(0.1)
print("T1", flow[0], "UPDATE")
Product.objects.filter(id=2).update(stock=13)
flow[0] = "Step 6"
break
while True:
if flow[0] == "Step 6":
sleep(0.1)
print("T1", flow[0], "COMMIT")
flow[0] = "Step 7"
break
break
@transaction.atomic
def transaction2(flow):
while True:
while True:
if flow[0] == "Step 3":
sleep(0.1)
print("T2", flow[0], "BEGIN")
flow[0] = "Step 4"
break
while True:
if flow[0] == "Step 4":
sleep(0.1)
print("T2", flow[0], "SELECT")
product = Product.objects.get(id=2)
print(product.id, product.name, product.stock)
flow[0] = "Step 5"
break
while True:
if flow[0] == "Step 7":
sleep(0.1)
print("T2", flow[0], "UPDATE")
Product.objects.filter(id=2).update(stock=16)
flow[0] = "Step 8"
break
while True:
if flow[0] == "Step 8":
sleep(0.1)
print("T2", flow[0], "COMMIT")
break
break
def call_transcations(request):
flow = ["Step 1"]
thread1 = Thread(target=transaction1, args=(flow,), daemon=True)
thread2 = Thread(target=transaction2, args=(flow,), daemon=True)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
return HttpResponse("Call_transcations")
Then, lost update occurred according to the result belew on console because in READ COMMITTED isolation level in PostgreSQL, lost update occurs:
T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same row
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same row
2 Orange 20
T1 Step 5 UPDATE # Writes "stock"
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Overwrites "stock"
T2 Step 8 COMMIT # And commits the overwrite
And also, I could get the SQL query logs of PostgreSQL below. You can check how to log SQL queries on PostgreSQL:
[20504]: BEGIN
[20504]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[3840]: BEGIN
[3840]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[20504]: UPDATE "store_product" SET "stock" = 13
WHERE "store_product"."id" = 2
[20504]: COMMIT
[3840]: UPDATE "store_product" SET "stock" = 16
WHERE "store_product"."id" = 2
[3840]: COMMIT
And, this table below shows the flow and SQL query logs of PostgreSQL above:
Flow | Transaction 1 (T1) | Transaction 2 (T2) | Explanation |
---|---|---|---|
Step 1 | BEGIN; |
T1 starts. | |
Step 2 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 Orange 20 |
T1 reads 20 which is updated later to 13 because a customer buys 7 oranges. |
|
Step 3 | BEGIN; |
T2 starts. | |
Step 4 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 Orange 20 |
T2 reads 20 which is updated later to 16 because a customer buys 4 oranges. |
|
Step 5 | UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; |
T1 updates 20 to 13 . |
|
Step 6 | COMMIT; |
T1 commits. | |
Step 7 | UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2; |
T2 updates 13 to 16 after T1 commits. |
|
Step 8 | COMMIT; |
T2 commits.*Lost update occurs. |
Next, because lost update doesn't occur in REPEATABLE READ
or SERIALIZABLE
isolation level in PostgreSQL so I set REPEATABLE READ
with psql as shown below:
postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'REPEATABLE READ';
Then again, I ran the code above then, lost update didn't occur because at T2 Step 7
, update was refused occurring 2 exceptions as shown below:
T1 Step 1 BEGIN
T1 Step 2 SELECT
2 Orange 20
T2 Step 3 BEGIN
T2 Step 4 SELECT
2 Orange 20
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to concurrent update
django.db.utils.OperationalError: could not serialize access due to concurrent update
And also, update was refused occurring 1 error and the 2nd transaction was rollbacked according to the SQL query logs of PostgreSQL below:
[14072]: BEGIN
[14072]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[1834]: BEGIN
[1834]: SELECT "store_product"."id", "store_product"."name", "store_product"."stock"
FROM "store_product"
WHERE "store_product"."id" = 2
LIMIT 21
[14072]: UPDATE "store_product"
SET "stock" = 13
WHERE "store_product"."id" = 2
[14072]: COMMIT
[1834] ERROR: could not serialize access due to concurrent update # Here
[1834] STATEMENT: UPDATE "store_product"
SET "stock" = 16
WHERE "store_product"."id" = 2
[1834]: ROLLBACK # Here
And, this table below shows the flow and SQL query logs of PostgreSQL above:
Flow | Transaction 1 (T1) | Transaction 2 (T2) | Explanation |
---|---|---|---|
Step 1 | BEGIN; |
T1 starts. | |
Step 2 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 Orange 20 |
T1 reads 20 which is updated later to 13 because a customer buys 7 oranges. |
|
Step 3 | BEGIN; |
T2 starts. | |
Step 4 | SELECT "store_product"."id", "store_product"."name", "store_product"."stock" FROM "store_product" WHERE "store_product"."id" = 2 LIMIT 21; 2 Orange 20 |
T2 reads 20 which is updated later to 16 because a customer buys 4 oranges. |
|
Step 5 | UPDATE "store_product" SET "stock" = 13 WHERE "store_product"."id" = 2; |
T1 updates 20 to 13 . |
|
Step 6 | COMMIT; |
T1 commits. | |
Step 7 | UPDATE "store_product" SET "stock" = 16 WHERE "store_product"."id" = 2; ERROR: could not serialize access due to concurrent update |
T2 cannot update 13 to 16 after T1 commits. |
|
Step 8 | ROLLBACK; |
T2 rollbacks.*Lost update doesn't occur. |
First, I created store_doctor
table with id
, name
and on_call
with models.py
as shown below:
store_doctor
table:id | name | on_call |
---|---|---|
1 | John | True |
2 | Lisa | True |
# "store/models.py"
from django.db import models
class Doctor(models.Model):
name = models.CharField(max_length=30)
on_call = models.BooleanField()
Then, I created and ran the test code of write skew as shown below:
# "store/views.py"
# ...
@transaction.atomic
def transaction1(flow):
while True:
while True:
if flow[0] == "Step 1":
print("T1", flow[0], "BEGIN")
flow[0] = "Step 2"
break
while True:
if flow[0] == "Step 2":
print("T1", flow[0], "SELECT")
doctor_count = Doctor.objects.filter(on_call=True).count()
print(doctor_count)
flow[0] = "Step 3"
break
while True:
if flow[0] == "Step 5":
print("T1", flow[0], "UPDATE")
Doctor.objects.filter(id=1).update(on_call=False)
flow[0] = "Step 6"
break
while True:
if flow[0] == "Step 6":
print("T1", flow[0], "COMMIT")
flow[0] = "Step 7"
break
break
@transaction.atomic
def transaction2(flow):
while True:
while True:
if flow[0] == "Step 3":
print("T2", flow[0], "BEGIN")
flow[0] = "Step 4"
break
while True:
if flow[0] == "Step 4":
print("T2", flow[0], "SELECT")
doctor_count = Doctor.objects.filter(on_call=True).count()
print(doctor_count)
flow[0] = "Step 5"
break
while True:
if flow[0] == "Step 7":
print("T2", flow[0], "UPDATE")
Doctor.objects.filter(id=2).update(on_call=False)
flow[0] = "Step 8"
break
while True:
if flow[0] == "Step 8":
print("T2", flow[0], "COMMIT")
break
break
# ...
Then, write skew occurred according to the result belew on console because in READ COMMITTED isolation level in PostgreSQL, write skew occurs:
T1 Step 1 BEGIN
T1 Step 2 SELECT # Reads the same data
2
T2 Step 3 BEGIN
T2 Step 4 SELECT # Reads the same data
2
T1 Step 5 UPDATE # Writes 'False' to John's "on_call"
T1 Step 6 COMMIT # And commits the write
T2 Step 7 UPDATE # Writes 'False' to Lisa's "on_call"
T2 Step 8 COMMIT # And commits the write
And also, I could get the SQL query logs of PostgreSQL below:
[11252]: BEGIN
[11252]: SELECT COUNT(*)
AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[2368]: BEGIN
[2368]: SELECT COUNT(*)
AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[11252]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 1
[11252]: COMMIT
[2368]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 2
[2368]: COMMIT
And, this table below shows the flow and SQL query logs of PostgreSQL above:
Flow | Transaction 1 (T1) | Transaction 2 (T2) | Explanation |
---|---|---|---|
Step 1 | BEGIN; |
T1 starts. | |
Step 2 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T1 reads 2 so John can take a rest. |
|
Step 3 | BEGIN; |
T2 starts. | |
Step 4 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T2 reads 2 so Lisa can take a rest. |
|
Step 5 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; |
T1 updates True to False which means John takes a rest. |
|
Step 6 | COMMIT; |
T1 commits. | |
Step 7 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2; |
T2 updates True to False which means Lisa takes a rest. |
|
Step 8 | COMMIT; |
T2 commits.John and Lisa both take a rest.*Write skew occurs. |
Next, because write skew doesn't occur in SERIALIZABLE
isolation level in PostgreSQL so I set SERIALIZABLE
with psql as shown below:
postgres=# ALTER DATABASE postgres SET DEFAULT_TRANSACTION_ISOLATION TO 'SERIALIZABLE';
Then again, I ran the code above then, write skew didn't occur because at T2 Step 7
, update was refused occurring 2 exceptions as shown below:
T1 Step 1 BEGIN
T1 Step 2 SELECT
2
T2 Step 3 BEGIN
T2 Step 4 SELECT
2
T1 Step 5 UPDATE
T1 Step 6 COMMIT
T2 Step 7 UPDATE # ↓↓ 2 exceptions occurred ↓↓
psycopg2.errors.SerializationFailure: could not serialize access due to read/write dependencies among transactions
django.db.utils.OperationalError: could not serialize access due to read/write dependencies among transactions
And also, update was refused occurring 1 error and the 2nd transaction was rollbacked according to the SQL query logs of PostgreSQL below:
[80642]: BEGIN
[80642]: SELECT COUNT(*) AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[4244]: BEGIN
[4244]: SELECT COUNT(*) AS "__count"
FROM "store_doctor"
WHERE "store_doctor"."on_call"
[80642]: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 1
[80642]: COMMIT
[4244] ERROR: could not serialize access due to read/write dependencies among transactions # Here
[4244] DETAIL: Reason code: Canceled on identification as a pivot, during write.
[4244] HINT: The transaction might succeed if retried.
[4244] STATEMENT: UPDATE "store_doctor"
SET "on_call" = false
WHERE "store_doctor"."id" = 2
[4244]: ROLLBACK # Here
And, this table below shows the flow and SQL query logs of PostgreSQL above:
Flow | Transaction 1 (T1) | Transaction 2 (T2) | Explanation |
---|---|---|---|
Step 1 | BEGIN; |
T1 starts. | |
Step 2 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T1 reads 2 so John can take a rest. |
|
Step 3 | BEGIN; |
T2 starts. | |
Step 4 | SELECT COUNT(*) AS "__count" FROM "store_doctor" WHERE "store_doctor"."on_call"; 2 |
T2 reads 2 so Lisa can take a rest. |
|
Step 5 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 1; |
T1 updates True to False which means John takes a rest. |
|
Step 6 | COMMIT; |
T1 commits. | |
Step 7 | UPDATE "store_doctor" SET "on_call" = false WHERE "store_doctor"."id" = 2; ERROR: could not serialize access due to read/write dependencies among transactions |
T2 cannot update True to False which means Lisa cannot take a rest. |
|
Step 8 | ROLLBACK; |
T2 rollbacks.Only John takes a rest.*Write skew doesn't occur. |
Upvotes: 2
Reputation: 1572
From what I'm reading you want to cover the path where you handle the exception. I'm asking you this question: do you really need it to be triggered in the case of multithreads race condition or do you just want to make sure that in the case it happens it does the right thing ?
Here's what I would do:
import unittest
import mock
# added just mimic django's orm for the purpose of the demo
class QuerySet(object):
def delete(self):
pass
def get_object_from_db():
return QuerySet()
def some_method():
try:
objectA = get_object_from_db()
objectA.delete()
return True # this should be whatever you want to do in case it worked
except Exception: # I would look up and check what ever error the django orm is raising.
return False # this should be whatever you want to do in case it didn't work
class TestClass(unittest.TestCase):
def test_some_method_in_case_it_worked(self):
self.assertEqual(some_method(), True)
def test_some_method_in_case_it_did_not_work(self):
with mock.patch('__main__.get_object_from_db') as mocked_get_object_from_db:
mocked_get_object_from_db.side_effect = RuntimeError('a message')
self.assertEqual(some_method(), False)
if __name__ == '__main__':
unittest.main()
mock is now part of the standard library. https://pypi.python.org/pypi/mock
Doing this is saving you from having flapper tests. You know the ones that randomly fail.
Upvotes: 2