Oleg Golovanov
Oleg Golovanov

Reputation: 924

PostgreSql 9.3 -> several consumers + locking

There are 2 tables:

CREATE TABLE "job"
(
    "id" SERIAL,
    "processed" BOOLEAN NOT NULL,
    PRIMARY KEY("id")
);
CREATE TABLE "job_result"
(
    "id" SERIAL,
    "job_id" INT NOT NULL,
    PRIMARY KEY("id")
);

There are several consumers, that do the following (sequentially):
1) start transaction
2) search for job not processed yet
3) process it
4) save result ( set processed field to true and insert into job_result )
5) commit

Questions:
1) Is the following sql code correct, so no job could be processed more than one time?
2) If it is correct, can it be rewritten in more clean way ? ( I am confused about "UPDATE job SET id = id" )

UPDATE job 
SET id = id 
WHERE id = 
(
    SELECT MIN(id) 
    FROM job
    WHERE processed = false AND pg_try_advisory_lock(id) = true
)
AND processed = false
RETURNING *

Thanks.

Upvotes: 0

Views: 403

Answers (2)

jjanes
jjanes

Reputation: 44363

Question 1

To answer your first question, the processing can be done twice if the database crashes between step 3 and step 5. When the server/service recovers, it will be processed again.

If the processing step only computes results which are sent to the database in the same connection as the queuing queries, then no one will be able to see that it was processed twice, as the results of the first time were never visible.

However if the processing step talks to the outside world, such as sending an email or charging a credit card, that action will be taken twice and both will be visible. The only way to avoid that is to use two-phase commits for all dealings with the outside world. Also, if the worker keeps two connections to the database and is not disciplined about their use, then that can also lead to visible double-processing.

Question 2

For your second question, there are several ways it can be made cleaner.

Most importantly, you will want to change the advisory lock from session-duration to transaction-duration. If you leave it at session-duration, long-lived workers will be become slower and slower and will use more and more memory as time goes on. This is safe to do, because in the query as written you are checking the processed flag in both the sub-select and in the update itself.

You could make the table structure itself cleaner. You could have one table with both the processed flag and the results field, instead of two tables. Or if you want two tables, you could remove the processed flag from the job table and signify completion simply be deleting the completed record from the table, rather than updating the processed flag.

Assuming you don't want to make such changes, you could still clean up the SQL without changing the table structure or semantics. You do need to lock the tuple to avoid a race condition with the release of the advisory lock. But rather than using the degenerate id=id construct (which some future maintainer is likely to remove, because it is not intuitively obvious why it is even there), you might as well just set the tuple to its final state by setting processed=true, and then removing that second update step from your step 4. This is safe to do because you do not issue an intermediate commit, so no one can see the tuple in this intermediate state of having processed=true but not yet really being processed.

UPDATE job 
SET processed = true 
WHERE id = 
(
    SELECT MIN(id) 
    FROM job
    WHERE processed = false AND pg_try_advisory_xact_lock(id) = true
)
AND processed = false
RETURNING id

However, this query still has the unwanted feature that often someone looking for the next job to process will find no rows. That is because it suffered a race condition which was then filtered out by the outer processed=false condition. This is OK as long as your workers are prepared to retry, but it leads to needless contention in the database. This can be improved by making the inner select lock the tuple when it first encounters it by switching from a min(id) to a LIMIT 1 query:

UPDATE job
SET processed=true
WHERE id =
(
    SELECT id
    FROM job
    WHERE processed = false AND pg_try_advisory_xact_lock(id) = true 
    order by id limit 1 for update
)
RETURNING id

If PostgreSQL allowed ORDER BY and LIMIT on UPDATES, then you could avoid the subselect altogether, but that is currently implemented (maybe it will be in 9.5).

For good performance (or even to avoid memory errors), you will need an index like:

create index on job (id) where processed = false;

Upvotes: 0

Clodoaldo Neto
Clodoaldo Neto

Reputation: 125454

with job_update as (
    update job
    set processed = true
    where id = (
        select id
        from (
            select min(id)
            from job
            where processed = false
        ) s
        for update
    )
    returning id
)
insert into job_result (job_id)
select id
from job_update

Upvotes: 0

Related Questions