Reputation: 12791
Say I have multiple workers that can concurrently read and write against a MySQL table (e.g. jobs
). The task for each worker is:
QUEUED
job RUNNING
Note that there may not be any qualifying (i.e. QUEUED
) jobs when a worker runs step #1.
I have the following pseudo-code so far. I believe I need to cancel (ROLLBACK
) the transaction if step #1 returns no jobs. How would I do that in the code below?
BEGIN TRANSACTION;
# Update the status of jobs fetched by this query:
SELECT id from jobs WHERE status = "QUEUED"
ORDER BY created_at ASC LIMIT 1;
# Do the actual update, otherwise abort (i.e. ROLLBACK?)
UPDATE jobs
SET status="RUNNING"
# HERE: Not sure how to make this conditional on the previous ID
# WHERE id = <ID from the previous SELECT>
COMMIT;
Upvotes: 6
Views: 1654
Reputation: 142306
(This is not an answer to the question, but a list of caveats that you need to be aware of when using any of the real Answers. Some of these have already been mentioned.)
TIMESTAMP
on the rows so a separate (cron/EVENT/whatever) job can discover what tasks are long overdue and clear them.Upvotes: 0
Reputation: 2462
Adding SKIP LOCKED
to the SELECT query, and putting in a SQL transaction, committed when the job is done, avoid jobs stuck in status RUNNING if a worker crashes (because the uncommitted transaction will rollback). It's now supported in newest versions of most common DBMS.
See:
Select only unlocked rows mysql
Upvotes: 0
Reputation: 562428
I am implementing something very similar to your case this week. A number of workers, each grabbing the "next" row in a set of rows to work on.
The pseudocode is something like this:
BEGIN;
SELECT ID INTO @id FROM mytable WHERE status = 'QUEUED' LIMIT 1 FOR UPDATE;
UPDATE mytable SET status = 'RUNNING' WHERE id = @id;
COMMIT;
Using FOR UPDATE
is important to avoid race conditions, i.e. more than one worker trying to grab the same row.
See https://dev.mysql.com/doc/refman/8.0/en/select-into.html for information about SELECT ... INTO
.
Upvotes: 3
Reputation: 31812
It's still not quite clear what you are after. But assuming your task is: Find the next QUEUED
job. Set it's status to RUNNING
and select the corresponding ID.
In a single threaded environment, you can just use your code. Fetch the selected ID into a variable in your application code and pass it to the UPDATE query in the WHERE clause. You don't even need a transaction, since there is only one writing statement. You can mimic in an SQLscript.
Assuming this is your current state:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | QUEUED |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
You want to start the next queued job (which has id=2).
SET @id_for_update = (
SELECT id
FROM jobs
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1
);
UPDATE jobs
SET status="RUNNING"
WHERE id = @id_for_update;
SELECT @id_for_update;
You will get
@id_for_update
2
from the last select. And the table will have this state:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | QUEUED |
| 4 | 2020-06-15 12:00:30 | QUEUED |
If you have multiple processes, which start jobs, you would need to lock the row with FOR UPDATE
. But that can be avoided using LAST_INSERT_ID()
:
Starting from the state above, with job 2 already running:
UPDATE jobs
SET status = 'RUNNING',
id = LAST_INSERT_ID(id)
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
SELECT LAST_INSERT_ID();
You will get:
| LAST_INSERT_ID() | ROW_COUNT() |
| ---------------- | ----------- |
| 3 | 1 |
And the new state is:
| id | created_at | status |
| --- | ------------------- | -------- |
| 1 | 2020-06-15 12:00:00 | COMLETED |
| 2 | 2020-06-15 12:00:10 | RUNNING |
| 3 | 2020-06-15 12:00:20 | RUNNING |
| 4 | 2020-06-15 12:00:30 | QUEUED |
If the UPDATE statement affected no row (there were no queued rows) ROW_COUNT()
will be 0
.
There might be some risks, which I am not aware of - But this is also not really how I would approach this. I would rather store more information in the jobs
table. Simple example:
CREATE TABLE jobs (
id INT auto_increment primary key,
created_at timestamp not null default now(),
updated_at timestamp not null default now() on update now(),
status varchar(50) not null default 'QUEUED',
process_id varchar(50) null default null
);
and
UPDATE jobs
SET status = 'RUNNING',
process_id = 'some_unique_pid'
WHERE status = 'QUEUED'
ORDER BY id
LIMIT 1;
Now a running job belongs to a specific process and you can just select it with
SELECT * FROM jobs WHERE process_id = 'some_unique_pid';
You might even like to have more information - eg. queued_at
, started_at
, finished_at
.
Upvotes: 3