Scalable Work Queues with Beanstalk
Any web application that reaches some critical mass eventually discovers that separation of services, where possible, is a great strategy for scaling the service. In fact, oftentimes a user action can be offloaded into a background task, which can be handled asynchronously while the user continues to explore the site. However, coordinating this workflow does require some infrastructure: a message queue, or a work queue. The distinction between the two is subtle and blurry, but it does carry important architectural implications. Should you pick a messaging bus such as AMQP or XMPP, roll your own database backed system such as BJ, go with Resque, or evaluate the other three dozen variants available in every conceivable language?
Of course, there is no single answer to that question - it depends on your application. AMQP is a great power tool for message routing, but there are other systems that can do a better job at specific tasks. One of such tools is Beanstalkd, which is a simple, and a very fast work queue service rolled into a single binary - it is the memcached of work queues. Originally built to power the backend for Causes Facebook app, it is a mature and production ready open source project. It just seems that not too many people talk about it, perhaps exactly because it works so well.
Beanstalkd Features & Recipes
Adam Wiggins recently published a great comparison of Beanstalk to a few other work-queue services, and speed is where it stands out. A single instance of Beanstalk is perfectly capable of handling thousands of jobs a second (or more, depending on your job size) because it is an in-memory, event-driven system. Powered by libevent under the hood, it requires zero setup (launch and forget, ala memcached), optional log based persistence, an easily parsed ASCII protocol, and a rich set of tools for job management that go well beyond a simple FIFO work queue.
Out of the box, Beanstalk supports multiple 'tubes' (work queues), which are created and deleted on demand. In turn, each job is associated with a single tube, and has a number of parameters: priority, time to run, delay, an id, and an opaque job body itself.
Once a job is inserted into the work queue, the server returns an ID, which we can use to inspect the job. From there, the queue itself is actually a priority heap! Need to jump a head in line? Set a higher priority on the job and Beanstalk will do the rest. Or, what if your worker goes down while processing a job? Because you specified a time to run on the job, Beanstalk will monitor the checked out job and put it back on the work queue if the timeout expires - seamless recovery, nice. Does the worker need more time to complete the job? There is a 'touch' command to notify the server to prolong the timeout. Have a bad job that you want to save for later inspection? Just bury it and take care of it later. Need to throttle all of the workers? You can pause the tube for a specified period of time. And there is more, do checkout the protocol specification.
Beanstalk at PostRank: Chronos
At PostRank we have dozens of Beanstalk processes sprinkled throughout which are being used for job management within the same machine and coordination between entire clusters. The larger deployments, which are the front-line coordinators to our crawlers are serving 50+ million jobs on a daily basis (average job is several kb), without breaking a sweat. On average, each job is just several kilobytes, but the numbers add up, meaning that a pure memory system would require 60GB+ of RAM to make it work for our use case. That is where the Beanstalk ASCII protocol, good old MySQL, and a little Ruby come together to create our scheduling system: Chronos.
The idea behind Chronos is based on a simple observation: we have tens of millions of crawler jobs, each of which is repeated on a custom interval, but only a small portion of that entire set needs to be in memory to make the system run! So, out of that observation, two projects were born: em-jack and em-proxy. EM-Jack is a Ruby Eventmachine client, which provides a simple mechanism to define custom command handlers that go beyond the native Beanstalk protocol. On the other hand, em-proxy is a protocol agnostic (layer 3) proxy, which allows us to intercept any TCP data stream and manipulate it at will.
So, instead of talking directly to Beanstalk, all the traffic is routed through our custom em-proxy (~150 LOC) which parses the Beanstalk protocol, intercepts custom commands, or simply inspects the "delay" parameter, and decides where the job should be routed: beanstalk or the MySQL instance. Jobs that are scheduled at least one hour into the future are persisted into the database, which significantly reduces the memory footprint. Finally, in the background, the upcoming jobs are silently loaded into beanstalk as their execution time approaches. Simple, reliable, scales well, and it gives us all the features available in Beanstalk for job management and coordination (plus persistence and replication of MySQL). A quick API demo:
EM.run do
jack = EMJack::Connection.new
r = jack.put("my message", :ttr => 300) { |jobid| puts "put successful #{jobid}" }
j = jack.reserve
j.callback do |job|
puts job.jobid
puts job.body
jack.delete(job) { puts "Successfully deleted" }
end
end
Architecture Limitations and Alternatives
There is an abundance of different job and message queue systems, in part because they are so seemingly simple to write. However, if you ever tried to build your own, you will also know that the scaling and the features available in Beanstalk are non-trivial to replicate. However, there are a few limitations as well. Beanstalk does not currently support replication or any other form of high availability. Likewise, there is no native sharding (a few clients support it), or a native GUI. Other then that, it works, it is fast, it is easy to extend, and it is definitely worth a test drive.