The best way to use a DB table as a job queue (a.k.a batch queue or message queue) The best way to use a DB table as a job queue (a.k.a batch queue or message queue) database database

The best way to use a DB table as a job queue (a.k.a batch queue or message queue)


The best way to implement a job queue in a relational database system is to use SKIP LOCKED.

SKIP LOCKED is a lock acquisition option that applies to both read/share (FOR SHARE) or write/exclusive (FOR UPDATE) locks and is widely supported nowadays:

  • Oracle 10g and later
  • PostgreSQL 9.5 and later
  • SQL Server 2005 and later
  • MySQL 8.0 and later

Now, consider we have the following post table:

post table

The status column is used as an Enum, having the values of:

  • PENDING (0),
  • APPROVED (1),
  • SPAM (2).

If we have multiple concurrent users trying to moderate the post records, we need a way to coordinate their efforts to avoid having two moderators review the same post row.

So, SKIP LOCKED is exactly what we need. If two concurrent users, Alice and Bob, execute the following SELECT queries which lock the post records exclusively while also adding the SKIP LOCKED option:

[Alice]:SELECT    p.id AS id1_0_,1    p.body AS body2_0_,    p.status AS status3_0_,    p.title AS title4_0_FROM    post pWHERE    p.status = 0ORDER BY    p.idLIMIT 2FOR UPDATE OF p SKIP LOCKED [Bob]:                                                                                                                                                                                                              SELECT    p.id AS id1_0_,    p.body AS body2_0_,    p.status AS status3_0_,    p.title AS title4_0_FROM    post pWHERE    p.status = 0ORDER BY    p.idLIMIT 2FOR UPDATE OF p SKIP LOCKED

We can see that Alice can select the first two entries while Bob selects the next 2 records. Without SKIP LOCKED, Bob lock acquisition request would block until Alice releases the lock on the first 2 records.


Here's what I've used successfully in the past:

MsgQueue table schema

MsgId identity -- NOT NULLMsgTypeCode varchar(20) -- NOT NULL  SourceCode varchar(20)  -- process inserting the message -- NULLable  State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL CreateTime datetime -- default GETDATE() -- NOT NULL  Msg varchar(255) -- NULLable  

Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).

Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.

The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.

Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.

This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.


Code from comments:

CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) ) AS DECLARE @MsgId INT BEGIN TRAN SELECT TOP 1 @MsgId = MsgId FROM MsgQueue WHERE MessageType = @pMessageType AND State = 'N' ORDER BY CreateTimeIF @MsgId IS NOT NULL BEGIN UPDATE MsgQueue SET State = 'A' WHERE MsgId = @MsgId SELECT MsgId, Msg FROM MsgQueue WHERE MsgId = @MsgId  END ELSE BEGIN SELECT MsgId = NULL, Msg = NULL END COMMIT TRAN


Instead of having owner = null when it isn't owned, you should set it to a fake nobody record instead. Searching for null doesn't limit the index, you might end up with a table scan. (this is for oracle, SQL server might be different)