Redis’ reliable queue pattern

During a project last year I came to think about how I could use Redis as a message queue. I wanted to use Redis and Node.js instead of introducing a separate product to the application stack. This is an overview of how I went about it.

Redis acknowledges the reliable queue pattern. To implement this pattern the only components needed are a Redis server and a worker process. In my case the worker was a dedicated Node.js process. Tasks are being created by an arbitrary producer.In my case the producer was my main Node.js application that needed to offload tasks to be processed asynchronously.

Note: For this overview the execution is exemplified by using the standard Redis command client instead of actual code snippets.

The execution will go like this.

The producer inserts a task into a “work” queue (a list in Redis) using the command LPUSH. Below are three tasks pushed into the work queue:

redis> LPUSH work "fetch_object:537cbe5c5257829b768f5e78"
(integer) 1
redis> LPUSH work "fetch_object:533eb6813c2f0b4d205aa0c5"
(integer) 2
redis> LPUSH work "fetch_object:537f29d534907c082de418e2"
(integer) 3

(For the purpose of understanding what happens in Redis, the content of the work queue is this:)

redis> LRANGE work 0 -1
1) "fetch_object:537f29d534907c082de418e2"
2) "fetch_object:533eb6813c2f0b4d205aa0c5"
3) "fetch_object:537cbe5c5257829b768f5e78"

Now there’s three tasks waiting to be fetched by a worker. Each task is a simple string containing whatever the worker needs to process it. In the example above the task is a fictive fetch_object function followed by an object ID after the colon. What functionality the worker must implement to complete this task, is of course specific to the application.

The worker polls the work queue by executing the blocking command BRPOPLPUSH that fetches the last task (tail) from the work queue and pushes it to a “processing” queue. (BRPOPLPUSH also yields the task to the worker. So no need for any additional commands to fetch the task.)

Below two workers processes, worker1 and worker2, polls the work queue with processing queue named worker1-queue and worker2-queue respectively:

redis> BRPOPLPUSH work worker1-queue 0
"fetch_object:537f29d534907c082de418e2"
redis> BRPOPLPUSH work worker2-queue 0
"fetch_object:533eb6813c2f0b4d205aa0c5"

BRPOPLPUSH executes the fetching and pushing atomically, which means it ensures that only one worker receives the task if there’s more than one worker polling from the work queue at the same time.

(For the purpose of this overview, the content of the two processing queues and the work queue is this:)

redis> LRANGE worker1-queue 0 -1
1) "fetch_object:537f29d534907c082de418e2"
redis> LRANGE worker2-queue 0 -1
1) "fetch_object:533eb6813c2f0b4d205aa0c5"
redis> LRANGE work 0 -1
1) "fetch_object:537cbe5c5257829b768f5e78"

So now there’s one task in each of the processing queues and one remaining in the work queue.

After BRPOPLPUSH has yielded a task, the worker starts to process the task by executing whatever needs to be executed. After the processing has finished, the task is removed from the processing queue (using command LREM):

redis> LREM worker1-queue -1 'fetch_object:537f29d534907c082de418e2'
(integer) 1

Now the processing queue for worker1 is empty:

redis> LRANGE worker1-queue 0 -1
(empty list or set)

And the cycle repeats itself: the worker now executes the blocking BRPOPLPUSH until a task is received.

I’ve implemented a proof of concept including a handler for calling webhooks. It’s available here: https://github.com/bmdako/BerlingskeMQ

The main weakness of this implementation is that, because each worker has it’s own unique processing queue, if a worker is taken out of production and deleted with unfinished work still left in it’s processing queue, the other worker have to find a way to automatically continue to process that particular queue. The first precaution is to avoid having any unfinished tasks in the processing queue when shutting down. In the POC above it’s implemented so that if the worker is terminated gracefully with a SIGINT, all tasks in the processing queue are completed before the worker is actually terminated. Furthermore, when starting a worker, it resumes any unfinished processing of tasks in it’s processing queue before fetching a new message from the global work queue.

With this cool pattern, Redis really shows it’s wide range of uses. And I really like the implementation. It’s simple a light weight. And it’s easy to scale up the amount of workers.

Advertisements

10 thoughts on “Redis’ reliable queue pattern

    1. I haven’t really tried any other libraries.
      And I haven’t used my own code for anything else than prototyping. But I’m confident enough that, given the opportunity, I would try to give it a shot for production using something based off of this.

  1. I think the problem of this pattern is that as `RPOPLPUSH` pops the latest item from the list, the oldest item can remain forever or can be processed late if producers insert very fast to the `work` queue. How did you handle this?

    1. Nice catch: There’s an error in the example output from “LRANGE work 0 -1” after LPUSH’ing the tasks into the work queue. The LRANGE displays the list in the wrong order.

      I will update the example in the blogpost so you can see that the oldest task actually is the one being sent to the worker process.

  2. I like your write up, is there any specific reason that your using the BRPOPLPUSH rather then the RPOPLPUSH? From my understanding your could use both here.

    1. Thank you Andrew. Much appriciated. The key difference between these two is the BRPOPLPUSH is a blocking command and will not yield a result in case the queue is empty. This means that the worker process will wait until something can be fetched from the queue.

  3. I’m about to implement something similar, but was not imagining that each worker has it’s own processing queue. Rather, they all share one. This seems to simplify some of the failure cases. Why did you chose to have one processing queue per worker?

    1. I chose to have one processing queue per worker so that it would be easier to see if a task was failed (e.g. worker killed) or a worker was still actively working on the task just by looking at running processes. And I wanted each worker to resume their own tasks when started again. Also, it would possible to implement some sort of “supervisor” logic because all the processing queues are actually a sort of “worker registry”.

      But I guess it all depends on your implementation and how you prefer to handle stopping and starting of workers and the resuming of tasks.

      I think you’re right that having one shared processing queue has other benefits. One example I was thinking of, is that you can add a timeout-timestamps and on each cycle, a worker would resume timeout’ed tasks.

      Would you care to elaborate on the simplifications that you had in minds?

  4. The length of processing queue of each worker should be 0 or 1 under normal conditions. But some network problem may cause the queue length larger than 1 or some task are not processing which due to how you deal with these exception.

Kommentar?

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s