Mikhail Puzanov
Mikhail Puzanov

Reputation: 229

Lock all the rows in the PostgreSQL table

I'm trying to create sort of a task queue in PG table, similar to this https://www.pgcon.org/2016/schedule/attachments/414_queues-pgcon-2016.pdf but a little more complicated.

1) There are tasks that are associated with a certain entity_id and they can be executed in parallel when entity_id for them are different. So for them there's a table:

create table entity_tasks (
  entity_id bigint,
  task text,
  inserted_at timestamp default now()
);

2) There are tasks that have to be executed exclusively, i.e. sequentially with all the other tasks. For this type of tasks there is a table as well:

create table block_everything_tasks (
  task TEXT,
  inserted_at TIMESTAMP DEFAULT NOW()
);

Executing a task from block_everything_tasks should block execution of all the tasks from entity_tasks and from block_everything_tasks.

After some prototyping I also added a table

create table entities_for_tasks (
  entity_id bigint primary key
);

Fetching and executing tasks per-entity works like this:

begin;
    select entity_id into entity_to_lock
    from entities_for_tasks
    for update skip locked
    limit 1;

    select * from entity_tasks
    where entity_id = entity_to_lock
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

So far so good, but it's getting awkward when I try to implement fetching tasks from the block_everything_tasks. I see some solutions here, but don't like any of them.

1) I can lock the whole entity_to_lock table explicitly, like this

begin;
    lock table entity_to_lock;

    select * from block_everything_tasks
    order by inserted_at
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

but that will prevent adding rows to tasks to entity_to_lock and may block adding tasks to one of the queues.

2) Or I can try to do something like this

begin;
    with lock as (
      select * from entity_to_lock for update
    )
    select * from block_everything_tasks
    order by inserted_at
    for update skip locked
    limit 1;

    -- execute them and delete from the `entity_tasks`
commit;

it looks like a okay-ish solution, I don't block submitters and entity_to_lock is not too big anyway, but I don't consume the rows from entity_to_lock and they are not locked, so it just doesn't work.

So my questions are

Upvotes: 1

Views: 508

Answers (1)

Nick Barnes
Nick Barnes

Reputation: 21336

Both INSERT and UPDATE acquire a ROW EXCLUSIVE lock, so you won't find any table-level lock which excludes one but not the other.

You can lock all existing rows against changes with a SELECT FOR UPDATE, but it won't affect concurrently INSERTed records, so they'd still be picked up and processed, regardless of what tasks are currently running.

There might also be issues with keeping the entities_for_tasks table in sync with entity_tasks, depending on exactly how you're populating it and what isolation level you're using; this kind of pattern is prone to race conditions at anything below SERIALIZABLE.


Taking a step back, you really have two distinct problems to solve: creating and allocating tasks, and coordinating the execution of tasks. The first problem is handled perfectly by the basic queueing mechanism, but attempting to solve the second by overloading that same mechanism seems to be the source of all of these conflicts.

So, leave the queue alone, and think about what else you actually need to coordinate task execution:

  1. A lock which says "a task is running"
  2. A set of locks which say "a task is running against entity x"

...where a task from block_everything_tasks needs an exclusive lock on (1), while tasks from entity_tasks can share a lock on (1) with each other, but need an exclusive lock on (2).

The most explicit way to implement this is through advisory locks, which let you "lock" arbitrary integers which hold some application-specific meaning.

Assuming no entity has ID 0, let's use that for the top-level "task is running" lock. Then, after successfully pulling a task from the queue, each exclusive task would run:

SELECT pg_advisory_xact_lock(0);

...and each per-entity task would run:

SELECT pg_advisory_xact_lock_shared(0);
SELECT pg_advisory_xact_lock(<entity_id of selected task>);

The main problem with advisory locking is that every user of the database needs to agree on what these integers mean, or they might end up contending for the same lock for unrelated purposes. The two-parameter (int,int) overloads of the locking functions allow you scope your locks to a particular use case, but that's not much help when your IDs are bigints.

If you can't be sure you're the only one in your database using advisory locks, you can emulate this with a table-based approach. Set up the table:

CREATE_TABLE currently_processing (
  entity_id bigint PRIMARY KEY
);

...then for exclusive tasks:

LOCK currently_processing;

...and for per-entity tasks:

INSERT INTO currently_processing VALUES (<entity_id of selected task>);
<run the task>
DELETE FROM currently_processing WHERE entity_id = <entity_id of selected task>;

The INSERTs will try to acquire a shared lock on the table (blocked by the exclusive task), and the unique index on the PRIMARY KEY will cause concurrent INSERTs for the same ID to block until the conflicting transaction commits or rolls back.

Upvotes: 1

Related Questions