One-Way RPC using AMQP as transport and THRIFT for communication
In this article I would like to present ‘thrift_amqp_transport’, a small library I’ve worked on in the past months. It allows you to do one way rpc (method calls) using AMQP as a transport layer. This will decouple the sender from the receiver completely. I will present some usage examples in the second part of this article.
What is AMQP? Igvita.com has recently featured a nice overview article about AMQP and Ruby. AMQP is a protocol for asynchronous messaging/ queuing over the network.
Thrift (also on igvita.com) is an RPC framework developed by Facebook (formerly) that got made an Apache project. It allows you to do cross language/ cross platform method calls. I am impressed by the clean design that went into this. The intermediary definition language that is used is surprisingly rich, allows you to aggregate data into types and to transmit hashes and lists.
A very basic example of AMQP usage
Your rails application wants to create directories on demand. Using AMQP, you can queue up new directories to be created like this:
require 'bunny'
Bunny.start do |connection|
work_queue = connection.queue('work')
work_queue.publish('create_directory foo')
end
A small sample worker might look like this:
require 'bunny'
Bunny.start do |connection|
work_queue = connection.queue('work')
work = work_queue.pop
unless work.payload == :queue_empty
command, *args = work.payload.split
# Execute the command
case command
when 'create_directory'
puts "Create directory #{args.first}"
end
else
puts "No work to do."
end
end
As you can see, any non-trivial use will implement some form of RPC-mechanism. (even this trivial example does) As long as you’re not transmitting only data, but want to act on that data, you will implement some kind of dispatch mechanism on the other end.
RPC instead of strings
To avoid writing a complex dispatch mechanism, it would probably be wise to
use some kind of RPC implementation to reduce complexity at this point. Using
‘thrift_amqp_transport
’, the above code would look like this:
require 'thrift_amqp_transport'
# Yes, its more setup code.
connection = Thrift::AMQP::Connection.start
transport = connection.client_transport('work_rpc')
protocol = Thrift::BinaryProtocol.new(transport)
client = MyService::Client.new(protocol)
client.createDirectory('foo')
This layers thrift binary messages atop the ‘work_rpc’ queue on your AMQP host. To your AMQP server, this looks like just another message. To receive work and do something, you would write this code:
require 'thrift_amqp_transport'
class MyServiceHandler
def createDirectory(name)
puts "Creating #{name}"
end
end
connection = Thrift::AMQP::Connection.start
handler = MyServiceHandler.new()
processor = MyService::Processor.new(handler)
server_transport = connection.server_transport('work_rpc')
server = Thrift::SimpleServer.new(processor, transport)
server.serve # never returns
As you can see, dispatching has completely gone from the code. You can now extend the service without rewriting any dispatcher/parser code, using thrift’s rich set of intermediate types in your method calls.
You might be wondering where all those new classes
(MyService::Processor, MyService::Client, ...
) suddenly come
from. Those are generated by thrift, starting from the service definition,
which would look like this in our case:
service MyService {
oneway void createDirectory(1:string name);
}
Some possible past&future uses of this
I am using this for queuing up work and acting on it like you can see in the example above. This is probably more complex (setup-wise) than other solutions, but allows me to restart workers at any time, since the work is stored on the queue in a persistent manner.
But of course, with the power of thrift and AMQP under the hood, some other uses become possible:
Round Robin Load Balancing
The library might offer the possibility to acknowledge a message once the
work is done. This would allow you to push work on the queue and get some
guarantees that its really becoming acted upon. If a worker pops a work item
and doesn’t acknowledge it, most AMQP implementations should redeliver the
work item at one of the next pop
s. (to the next worker)
Service Discovery (one to many broadcast)
With the current state of the code, you could already have a service broadcast its presence on the network, updating it with heartbeats.
Two-Way Messaging
I would be interested in extending this library to allow two-way messaging.
Status & Limitations
The library is under active development and will be extended in the future. Some of the above uses are not yet possible, patches are welcome.
You can get the source code here.