Advanced Messaging & Routing with AMQP
Not all message queues are made equal. In the simplest case, a message queue is synonymous with an asynchronous protocol in which the sender and the receiver do not operate on the message at the same time. However, while this pattern is most commonly used to decouple distinct services (an intermediate mailbox, of sorts), the more advanced implementations also enable a host more advanced recipes: load balancing, queueing, failover, pubsub, etc. AMQP can do all of the above, and yesterday's announcement of RabbitMQ 1.7 (an open source AMQP broker) warrants a closer look.
Originally developed at JP Morgan as a vendor neutral wire and broker protocol, AMQP (Advanced Message Queuing Protocol) is, in fact, a general purpose messaging bus. The protocol itself is still under active development, but there are a variety of open source client and server implementations for it, as well as some big commercial supporters (RedHat, Microsoft, etc). In other words, it works, it is production ready, and I can vouch for it from personal experience - we stream tens of millions of messages through AMQP at PostRank on a daily basis.
AMQP vs XMPP: Features & Architecture
The AMQP vs XMPP debate has been raging for years now. On the surface they both look identical, but in reality there are a number of important distinctions. For example, presence is one of the central components of XMPP, but it is not part of the AMQP specification. XMPP uses XML, whereas AMQP has a binary protocol. AMQP has native support for a number of delivery use cases (at least once, exactly once, select subscribers, persistence, etc) and also a variety of exchange implementations which allow fine-grained control to where and how the messages are routed.
The AMQP spec is a fast and recommended read, but by a way of quick introduction, the core architectural components are: publisher, exchange, queue, and consumer. As you may have guessed, the publisher is the data producer which pushes messages to an exchange. Why is it called an exchange? Because the exchange is a routing engine which is responsible for delivering the messages to the right queues (exchanges never store messages). For example, a message may need to be routed to just a single queue (direct exchange), maybe the message should be forwarded to every queue (pubsub) in the list (fanout exchange), or perhaps the message should be routed based on a key (topic exchange).
Publishing & Consuming AMQP in Ruby
The type of exchange, message parameters, and the name of the attached queue can all contribute to the delivery and routing behavior of the message. However, for the sake of example, let's create a simple pubsub fanout exchange in Ruby:
require 'amqp'
AMQP.start(:host => 'localhost') do
# create a fanout exchange on the broker
exchange = MQ.new.fanout('multicast')
# publish multiple messages to fanout
exchange.publish('hello')
exchange.publish('world')
end
In order to consume the messages from an exchange the consumer needs to create a queue and then bind it to an exchange. A queue can be durable (survive between server restarts), or auto-deletable for cases when the queue should disappear if the consumer goes down. Best of all, once the queue is bound to an exchange, the messages are streamed to the client in real-time via a persistent connection (no polling!):
require 'amqp'
AMQP.start(:host => 'localhost') do
amq = MQ.new
# bind 'listener' queue to 'multicast' exchange
amq.queue('listener').bind(amq.fanout('multicast')).subscribe do |msg|
puts msg # process your message here
end
end
Advanced AMQP Recipes
The flexibility of the message and the exchange model is what makes AMQP such a powerful tool. Whenever a publisher generates a message, he can mark it as 'persistent' which will guarantee delivery through the broker - if there is an attached queue, it will accumulate messages until the consumer requests them. However, if you're streaming transient data (access logs, for example), you can also disable message persistence and not worry about overwhelming your broker. That's how you achieve 'exactly-once' vs 'at least once' semantics.
Trying to build a pubsub hub? Create a fanout exchange and attach as many queues as you want, each consumer will receive a copy of the message. Load balancing? Bind two workers to the same queue and the broker will automatically round-robin the messages (there is no upper limit on the number of workers). Failover? By default the AMQP broker does not require a message to be ACKed by a consumer, but with a simple configuration flag the messages will be kept on the server until the ACK is received. If the consumer goes down without ACKing a message, they will be automatically put back on the queue. Need to route a message based on a key? Topic exchange allows partial matching based on a message key that is set by the producer. Do you want to notify the producer if there are no subscribers attached to a queue? Set the immediate flag on the message and the broker will do all the work. Best of all, you can also compose these patterns to cover virtually any delivery use case!
AMQP Brokers & Ruby / Rails
There are a variety of available broker implementations: ZeroMQ, ActiveMQ, OpenAMQ, and RabbitMQ. Because the underlying protocol is still in flux, there is definitely some variation between all the implementations - do your homework. If you're looking for a speed demon, ZeroMQ claims a 15.4 microsend routing overhead (4+ million msgs/s). However, RabbitMQ is arguably the most stable and feature complete broker implementation. If you are a CentOS or a Fedora user, you'll be happy to know that it is now part of the distro (yum install rabbitmq-server
), otherwise follow the installation instructions.
Once the server is installed, follow the administration guide to start the broker. If you're looking for a RESTful or a GUI tool to help you configure the broker, drop in Alice on the same server. Like the SQL prompt? Install the BQL plugin and familiarize yourself with the syntax.The AMQP gem is probably the best choice when it comes to Ruby clients - it is asynchronous, it is fast, and it is in use by dozens of companies. If you're looking for a synchronous client, Carrot gem is the answer. If you're using async-observer plugin in your Rails projects, you can drop in async-observer-amqp to migrate to AMQP. In other words, it is easy to get started, it is incredibly powerful, and it has great library support for virtually every language. Give it a try.