Josh
Josh

Reputation: 12791

Atomic read and update in MySQL with concurrent workers

Say I have multiple workers that can concurrently read and write against a MySQL table (e.g. jobs). The task for each worker is:

  1. Find the oldest QUEUED job
  2. Set it's status to RUNNING
  3. Return the corresponding ID.

Note that there may not be any qualifying (i.e. QUEUED) jobs when a worker runs step #1.

I have the following pseudo-code so far. I believe I need to cancel (ROLLBACK) the transaction if step #1 returns no jobs. How would I do that in the code below?

BEGIN TRANSACTION;

# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED" 
ORDER BY created_at ASC LIMIT 1;

# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>

COMMIT;

Upvotes: 6

Views: 1654

Answers (4)

Rick James
Rick James

Reputation: 142306

(This is not an answer to the question, but a list of caveats that you need to be aware of when using any of the real Answers. Some of these have already been mentioned.)

  • Replication -- You must do all the locking on the Primary. If you are using a cluster with multiple writable nodes, be aware of the inter-node delays.
  • Backlog -- When something breaks, you could get a huge list of tasks in the queue. This may lead to some ugly messes.
  • Number of 'workers' -- Don't have more than a "few" workers. If you try to have, say, 100 concurrent workers, they will stumble over each other an cause nasty problems.
  • Reaper -- Since a worker may crash, the task assigned to it may never get cleared. Have a TIMESTAMP on the rows so a separate (cron/EVENT/whatever) job can discover what tasks are long overdue and clear them.
  • If the tasks are fast enough, then the overhead of the queue could be a burden. That is, "Don't queue it, just do it."
  • You are right to grab the task in one transaction, then later release the task in a separate transaction. Using InnoDB's locking is folly for any be trivially fast actions.

Upvotes: 0

Marco Marsala
Marco Marsala

Reputation: 2462

Adding SKIP LOCKED to the SELECT query, and putting in a SQL transaction, committed when the job is done, avoid jobs stuck in status RUNNING if a worker crashes (because the uncommitted transaction will rollback). It's now supported in newest versions of most common DBMS.

See:

Select only unlocked rows mysql

https://dev.mysql.com/doc/refman/8.0/en/innodb-locking-reads.html#innodb-locking-reads-nowait-skip-locked

Upvotes: 0

Bill Karwin
Bill Karwin

Reputation: 562428

I am implementing something very similar to your case this week. A number of workers, each grabbing the "next" row in a set of rows to work on.

The pseudocode is something like this:

BEGIN;

SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;

UPDATE mytable SET status = 'RUNNING' WHERE id = @id;

COMMIT;

Using FOR UPDATE is important to avoid race conditions, i.e. more than one worker trying to grab the same row.

See https://dev.mysql.com/doc/refman/8.0/en/select-into.html for information about SELECT ... INTO.

Upvotes: 3

Paul Spiegel
Paul Spiegel

Reputation: 31812

It's still not quite clear what you are after. But assuming your task is: Find the next QUEUED job. Set it's status to RUNNING and select the corresponding ID.

In a single threaded environment, you can just use your code. Fetch the selected ID into a variable in your application code and pass it to the UPDATE query in the WHERE clause. You don't even need a transaction, since there is only one writing statement. You can mimic in an SQLscript.

Assuming this is your current state:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | QUEUED   |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

You want to start the next queued job (which has id=2).

SET @id_for_update = (
  SELECT id
  FROM jobs
  WHERE status = 'QUEUED'
  ORDER BY id
  LIMIT 1
);

UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;

SELECT @id_for_update;

You will get

@id_for_update
2

from the last select. And the table will have this state:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | QUEUED   |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

If you have multiple processes, which start jobs, you would need to lock the row with FOR UPDATE. But that can be avoided using LAST_INSERT_ID():

Starting from the state above, with job 2 already running:

UPDATE jobs
SET status = 'RUNNING',
    id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

SELECT LAST_INSERT_ID();

You will get:

| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3                | 1           |

And the new state is:

| id  | created_at          | status   |
| --- | ------------------- | -------- |
| 1   | 2020-06-15 12:00:00 | COMLETED |
| 2   | 2020-06-15 12:00:10 | RUNNING  |
| 3   | 2020-06-15 12:00:20 | RUNNING  |
| 4   | 2020-06-15 12:00:30 | QUEUED   |

View on DB Fiddle

If the UPDATE statement affected no row (there were no queued rows) ROW_COUNT() will be 0.

There might be some risks, which I am not aware of - But this is also not really how I would approach this. I would rather store more information in the jobs table. Simple example:

CREATE TABLE jobs (
  id INT auto_increment primary key,
  created_at timestamp not null default now(),
  updated_at timestamp not null default now() on update now(),
  status varchar(50) not null default 'QUEUED',
  process_id varchar(50) null default null
);

and

UPDATE jobs
SET status = 'RUNNING',
    process_id = 'some_unique_pid'    
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;

Now a running job belongs to a specific process and you can just select it with

SELECT * FROM jobs WHERE process_id = 'some_unique_pid';

You might even like to have more information - eg. queued_at, started_at, finished_at.

Upvotes: 3

Related Questions