Reputation: 229
I'm trying to create sort of a task queue in PG table, similar to this https://www.pgcon.org/2016/schedule/attachments/414_queues-pgcon-2016.pdf but a little more complicated.
1) There are tasks that are associated with a certain entity_id
and they can be executed in parallel when entity_id
for them are different. So for them there's a table:
create table entity_tasks (
entity_id bigint,
task text,
inserted_at timestamp default now()
);
2) There are tasks that have to be executed exclusively, i.e. sequentially with all the other tasks. For this type of tasks there is a table as well:
create table block_everything_tasks (
task TEXT,
inserted_at TIMESTAMP DEFAULT NOW()
);
Executing a task from block_everything_tasks
should block execution of all the tasks from entity_tasks
and from block_everything_tasks
.
After some prototyping I also added a table
create table entities_for_tasks (
entity_id bigint primary key
);
Fetching and executing tasks per-entity works like this:
begin;
select entity_id into entity_to_lock
from entities_for_tasks
for update skip locked
limit 1;
select * from entity_tasks
where entity_id = entity_to_lock
order by inserted_at
limit 1;
-- execute them and delete from the `entity_tasks`
commit;
So far so good, but it's getting awkward when I try to implement fetching tasks from the block_everything_tasks
. I see some solutions here, but don't like any of them.
1) I can lock the whole entity_to_lock
table explicitly, like this
begin;
lock table entity_to_lock;
select * from block_everything_tasks
order by inserted_at
limit 1;
-- execute them and delete from the `entity_tasks`
commit;
but that will prevent adding rows to tasks to entity_to_lock
and may block adding tasks to one of the queues.
2) Or I can try to do something like this
begin;
with lock as (
select * from entity_to_lock for update
)
select * from block_everything_tasks
order by inserted_at
for update skip locked
limit 1;
-- execute them and delete from the `entity_tasks`
commit;
it looks like a okay-ish solution, I don't block submitters and entity_to_lock
is not too big anyway, but I don't consume the rows from entity_to_lock
and they are not locked, so it just doesn't work.
So my questions are
entity_to_lock
table in the option (1)
so that inserts are still possible and select * from entity_to_lock
where ... for update
will be locked?Upvotes: 1
Views: 508
Reputation: 21336
Both INSERT
and UPDATE
acquire a ROW EXCLUSIVE
lock, so you won't find any table-level lock which excludes one but not the other.
You can lock all existing rows against changes with a SELECT FOR UPDATE
, but it won't affect concurrently INSERT
ed records, so they'd still be picked up and processed, regardless of what tasks are currently running.
There might also be issues with keeping the entities_for_tasks
table in sync with entity_tasks
, depending on exactly how you're populating it and what isolation level you're using; this kind of pattern is prone to race conditions at anything below SERIALIZABLE
.
Taking a step back, you really have two distinct problems to solve: creating and allocating tasks, and coordinating the execution of tasks. The first problem is handled perfectly by the basic queueing mechanism, but attempting to solve the second by overloading that same mechanism seems to be the source of all of these conflicts.
So, leave the queue alone, and think about what else you actually need to coordinate task execution:
x
"...where a task from block_everything_tasks
needs an exclusive lock on (1), while tasks from entity_tasks
can share a lock on (1) with each other, but need an exclusive lock on (2).
The most explicit way to implement this is through advisory locks, which let you "lock" arbitrary integers which hold some application-specific meaning.
Assuming no entity has ID 0
, let's use that for the top-level "task is running" lock. Then, after successfully pulling a task from the queue, each exclusive task would run:
SELECT pg_advisory_xact_lock(0);
...and each per-entity task would run:
SELECT pg_advisory_xact_lock_shared(0);
SELECT pg_advisory_xact_lock(<entity_id of selected task>);
The main problem with advisory locking is that every user of the database needs to agree on what these integers mean, or they might end up contending for the same lock for unrelated purposes. The two-parameter (int,int)
overloads of the locking functions allow you scope your locks to a particular use case, but that's not much help when your IDs are bigint
s.
If you can't be sure you're the only one in your database using advisory locks, you can emulate this with a table-based approach. Set up the table:
CREATE_TABLE currently_processing (
entity_id bigint PRIMARY KEY
);
...then for exclusive tasks:
LOCK currently_processing;
...and for per-entity tasks:
INSERT INTO currently_processing VALUES (<entity_id of selected task>);
<run the task>
DELETE FROM currently_processing WHERE entity_id = <entity_id of selected task>;
The INSERT
s will try to acquire a shared lock on the table (blocked by the exclusive task), and the unique index on the PRIMARY KEY
will cause concurrent INSERT
s for the same ID to block until the conflicting transaction commits or rolls back.
Upvotes: 1