Reputation: 14583
Here is pseudo code for what I'm trying to do:
rate_count = SELECT COUNT(id) FROM job WHERE last_processed_at >= ?
current_limit = rate_limit - rate_count
if current_limit > 0
UPDATE job SET state='processing'
WHERE id IN(
SELECT id FROM job
WHERE state='pending'
LIMIT :current_limit
)
I have it working except for concurrency issues. When run from multiple sessions at the same time, both sessions SELECT and therefore update the same stuff :(
I'm able to get the 2nd query atomic by adding FOR UPDATE in it's SELECT subquery. But I can't add FOR UPDATE to the first query because FOR UPDATE isn't allowed with aggregate functions
How can I make this piece an atomic transaction?
Upvotes: 5
Views: 2704
Reputation: 1
I got the same error below:
ERROR: FOR UPDATE is not allowed with aggregate functions
Because I use count()
and FOR UPDATE
as shown below:
SELECT count(*) FROM person FOR UPDATE;
So, I changed the query above to one of 2 queries below:
SELECT count(*) FROM (SELECT * FROM person FOR UPDATE) AS result;
WITH result AS (SELECT * FROM person FOR UPDATE) SELECT count(*) FROM result;
Then, I could use count()
and FOR UPDATE
together:
count
-------
7
(1 row)
Upvotes: 0
Reputation: 4582
You can do FOR UPDATE within a subquery:
rate_count := COUNT(id)
FROM (
SELECT id FROM job
WHERE last_processed_at >= ? FOR UPDATE
) a;
You can also do this whole thing in a single query:
UPDATE job SET state='processing'
WHERE id IN (
SELECT id FROM job
WHERE state='pending'
LIMIT (SELECT GREATEST(0, rate_limit - COUNT(id))
FROM (SELECT id FROM job
WHERE last_processed_at >= ? FOR UPDATE) a
)
)
Upvotes: 4