blog.absurd:li - press play on tape
November 5th 2009
Tagged amqp, thrift, ruby, queue, rpc

One-Way RPC using AMQP as transport and THRIFT for communication

In this article I would like to present ‘thrift_amqp_transport’, a small library I’ve worked on in the past months. It allows you to do one way rpc (method calls) using AMQP as a transport layer. This will decouple the sender from the receiver completely. I will present some usage examples in the second part of this article.

What is AMQP? Igvita.com has recently featured a nice overview article about AMQP and Ruby. AMQP is a protocol for asynchronous messaging/ queuing over the network.

Thrift (also on igvita.com) is an RPC framework developed by Facebook (formerly) that got made an Apache project. It allows you to do cross language/ cross platform method calls. I am impressed by the clean design that went into this. The intermediary definition language that is used is surprisingly rich, allows you to aggregate data into types and to transmit hashes and lists.

A very basic example of AMQP usage

Your rails application wants to create directories on demand. Using AMQP, you can queue up new directories to be created like this:


  require 'bunny'

  Bunny.start do |connection|
    work_queue = connection.queue('work')

    work_queue.publish('create_directory foo')
  end

A small sample worker might look like this:


  require 'bunny'

  Bunny.start do |connection|
    work_queue = connection.queue('work')

    work = work_queue.pop

    unless work.payload == :queue_empty
      command, *args = work.payload.split

      # Execute the command 
      case command
        when 'create_directory'
          puts "Create directory #{args.first}"
      end
    else
      puts "No work to do."
    end
  end

As you can see, any non-trivial use will implement some form of RPC-mechanism. (even this trivial example does) As long as you’re not transmitting only data, but want to act on that data, you will implement some kind of dispatch mechanism on the other end.

RPC instead of strings

To avoid writing a complex dispatch mechanism, it would probably be wise to use some kind of RPC implementation to reduce complexity at this point. Using ‘thrift_amqp_transport’, the above code would look like this:


  require 'thrift_amqp_transport'
  
  # Yes, its more setup code.
  connection = Thrift::AMQP::Connection.start
  transport = connection.client_transport('work_rpc')
  protocol = Thrift::BinaryProtocol.new(transport)
  client = MyService::Client.new(protocol)

  client.createDirectory('foo')

This layers thrift binary messages atop the ‘work_rpc’ queue on your AMQP host. To your AMQP server, this looks like just another message. To receive work and do something, you would write this code:


  require 'thrift_amqp_transport'
  
  class MyServiceHandler
    def createDirectory(name)
      puts "Creating #{name}"
    end
  end
  
  connection = Thrift::AMQP::Connection.start
  handler = MyServiceHandler.new()
  processor = MyService::Processor.new(handler)
  server_transport = connection.server_transport('work_rpc')

  server = Thrift::SimpleServer.new(processor, transport)
  server.serve    # never returns

As you can see, dispatching has completely gone from the code. You can now extend the service without rewriting any dispatcher/parser code, using thrift’s rich set of intermediate types in your method calls.

You might be wondering where all those new classes (MyService::Processor, MyService::Client, ...) suddenly come from. Those are generated by thrift, starting from the service definition, which would look like this in our case:


  service MyService {
    oneway void createDirectory(1:string name);
  }

Some possible past&future uses of this

I am using this for queuing up work and acting on it like you can see in the example above. This is probably more complex (setup-wise) than other solutions, but allows me to restart workers at any time, since the work is stored on the queue in a persistent manner.

But of course, with the power of thrift and AMQP under the hood, some other uses become possible:

Round Robin Load Balancing

The library might offer the possibility to acknowledge a message once the work is done. This would allow you to push work on the queue and get some guarantees that its really becoming acted upon. If a worker pops a work item and doesn’t acknowledge it, most AMQP implementations should redeliver the work item at one of the next pops. (to the next worker)

Service Discovery (one to many broadcast)

With the current state of the code, you could already have a service broadcast its presence on the network, updating it with heartbeats.

Two-Way Messaging

I would be interested in extending this library to allow two-way messaging.

Status & Limitations

The library is under active development and will be extended in the future. Some of the above uses are not yet possible, patches are welcome.

You can get the source code here.