During a project last year I came to think about how I could use Redis as a message queue. I wanted to use Redis and Node.js instead of introducing a separate product to the application stack. This is an overview of how I went about it.
Redis acknowledges the reliable queue pattern. To implement this pattern the only components needed are a Redis server and a worker process. In my case the worker was a dedicated Node.js process. Tasks are being created by an arbitrary producer.In my case the producer was my main Node.js application that needed to offload tasks to be processed asynchronously.
Note: For this overview the execution is exemplified by using the standard Redis command client instead of actual code snippets.
The execution will go like this.
The producer inserts a task into a “work” queue (a list in Redis) using the command LPUSH. Below are three tasks pushed into the work queue:
redis> LPUSH work "fetch_object:537cbe5c5257829b768f5e78" (integer) 1 redis> LPUSH work "fetch_object:533eb6813c2f0b4d205aa0c5" (integer) 2 redis> LPUSH work "fetch_object:537f29d534907c082de418e2" (integer) 3
(For the purpose of understanding what happens in Redis, the content of the work queue is this:)
redis> LRANGE work 0 -1 1) "fetch_object:537f29d534907c082de418e2" 2) "fetch_object:533eb6813c2f0b4d205aa0c5" 3) "fetch_object:537cbe5c5257829b768f5e78"
Now there’s three tasks waiting to be fetched by a worker. Each task is a simple string containing whatever the worker needs to process it. In the example above the task is a fictive fetch_object function followed by an object ID after the colon. What functionality the worker must implement to complete this task, is of course specific to the application.
The worker polls the work queue by executing the blocking command BRPOPLPUSH that fetches the last task (tail) from the work queue and pushes it to a “processing” queue. (BRPOPLPUSH also yields the task to the worker. So no need for any additional commands to fetch the task.)
Below two workers processes, worker1 and worker2, polls the work queue with processing queue named worker1-queue and worker2-queue respectively:
redis> BRPOPLPUSH work worker1-queue 0 "fetch_object:537f29d534907c082de418e2" redis> BRPOPLPUSH work worker2-queue 0 "fetch_object:533eb6813c2f0b4d205aa0c5"
BRPOPLPUSH executes the fetching and pushing atomically, which means it ensures that only one worker receives the task if there’s more than one worker polling from the work queue at the same time.
(For the purpose of this overview, the content of the two processing queues and the work queue is this:)
redis> LRANGE worker1-queue 0 -1 1) "fetch_object:537f29d534907c082de418e2" redis> LRANGE worker2-queue 0 -1 1) "fetch_object:533eb6813c2f0b4d205aa0c5" redis> LRANGE work 0 -1 1) "fetch_object:537cbe5c5257829b768f5e78"
So now there’s one task in each of the processing queues and one remaining in the work queue.
After BRPOPLPUSH has yielded a task, the worker starts to process the task by executing whatever needs to be executed. After the processing has finished, the task is removed from the processing queue (using command LREM):
redis> LREM worker1-queue -1 'fetch_object:537f29d534907c082de418e2' (integer) 1
Now the processing queue for worker1 is empty:
redis> LRANGE worker1-queue 0 -1 (empty list or set)
And the cycle repeats itself: the worker now executes the blocking BRPOPLPUSH until a task is received.
I’ve implemented a proof of concept including a handler for calling webhooks. It’s available here: https://github.com/bmdako/BerlingskeMQ
The main weakness of this implementation is that, because each worker has it’s own unique processing queue, if a worker is taken out of production and deleted with unfinished work still left in it’s processing queue, the other worker have to find a way to automatically continue to process that particular queue. The first precaution is to avoid having any unfinished tasks in the processing queue when shutting down. In the POC above it’s implemented so that if the worker is terminated gracefully with a SIGINT, all tasks in the processing queue are completed before the worker is actually terminated. Furthermore, when starting a worker, it resumes any unfinished processing of tasks in it’s processing queue before fetching a new message from the global work queue.
With this cool pattern, Redis really shows it’s wide range of uses. And I really like the implementation. It’s simple a light weight. And it’s easy to scale up the amount of workers.