Queues in Postgres
At Netflix, part of the infrastructure I work on is a distributed priority queue built on Redis. It’s great in many ways, but also pretty expensive and complex.
I’ll detail that another time. For now, I’m going to focus on a much simpler way to solve this if you have lesser constraints than what we do. This is based on Postgres but would work for most relational databases. I’ve not load tested this and couldn’t really tell you how it scales, but it should be “good enough” for most queueing under ~1k QPS and <1M messages (and probably fair amount higher, but again, haven’t load tested).
Note: This was used at our golang-index as well as in Go’s pkgsite Postgres queue.
Schema
-- This is the queue table.
CREATE TABLE IF NOT EXISTS queue_tasks (
-- Bookkeeping.
id BIGSERIAL PRIMARY KEY,
-- Each consumer/dequeue-er needs a unique task name. appID+threadID for example.
task_name TEXT UNIQUE NOT NULL,
-- [...] Whatever payload you need for each message goes here.
-- Queueing.
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ
-- Optionally also add completed_at and maybe success/failure columns if you
-- want a history; otherwise just delete on finish.
);
CREATE INDEX IF NOT EXISTS idx_queue_tasks_started_created
ON queue_tasks (started_at, created_at);
Enqueueing
Enqueue with an atomic test-and-set query:
INSERT INTO queue_tasks (...) -- All but created_at.
VALUES (...)
ON CONFLICT (task_name) DO NOTHING; -- Another consumer won the race; no-op.
Dequeueing
Dequeue with a test for stalled workers:
WITH next_task AS (
SELECT id
FROM queue_tasks
WHERE started_at IS NULL
-- Allow accepting work from stalled workers.
OR started_at + INTERVAL '5 minutes' < NOW()
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE queue_tasks
SET started_at = NOW()
WHERE id = (SELECT id FROM next_task)
RETURNING id, module_path, version, started_at
With priority
Simple add a priority field of your choosing and then ORDER BY priority
instead of ORDER BY created_at.
With certain consumer limit
If you only want n consumers, simply add `LIMIT
With lease extension
If you need lease extension, you can add updated_at and change the stall check
to:
WITH next_task AS (
SELECT id
FROM queue_tasks
WHERE started_at IS NULL
-- Allow accepting work from stalled workers; but now we used
-- leased_until, which workers are expected to periodically lease
-- extend.
OR leased_until < NOW()
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE queue_tasks
SET started_at = NOW()
AND leased_until = NOW() + INTERVAL '5 minutes' -- Or however long you want leases.
WHERE id = (SELECT id FROM next_task)
RETURNING id, module_path, version, started_at
And then have your workers periodically update leased_until to some time in
the near future.
With batching
If you need to batch dequeue, you can do:
WITH active_workers AS (
SELECT COUNT(*) AS current_count
FROM queue_tasks
WHERE started_at IS NOT NULL
AND started_at + INTERVAL '5 minutes' >= NOW()
),
next_task AS (
SELECT id
FROM queue_tasks
WHERE (SELECT current_count FROM active_workers) < :n
AND (started_at IS NULL OR started_at + INTERVAL '5 minutes' < NOW())
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE queue_tasks
SET started_at = NOW()
WHERE id = (SELECT id FROM next_task)
RETURNING id, module_path, version, started_at
Conclusion
SQL databases are pretty easy to turn into queueing databases, and it’s fairly easy to tweak your SQL scripts to make the queue behave the way you want them to.
If you already rely on a SQL database and need to add queueing, consider just doing it in your pre-existing SQL database rather than adding a whole new system dependency.