blog.absurd:li - press play on tape

The new Thrift / AMQP bridge: Explained

Abstract

Since this has grown into a really long article, Here’s what its about: I talk about RPC using thrift and AMQP. This can be setup in at least two different ways, one gives you round-robin load balancing, the other allows you to filter messages. At the end I provide you with a project roadmap and an invitation to contribute.

thrift_over_amqp becomes toamqp?

I have reworked the thrift_over_amqp plugin into a real gem that can be used everywhere. To do this, I’ve rewritten the code and cleaned up the javaish API mess. Must’ve been the contact stress with thrift APIs. But this is old news already.

Let me try to give you some new .. well ah .. news. Here’s a graphic list of operation modes for thrift calls when sent via an AMQP broker:

Thrift itself provides you just with the same old point to point procedure call (first schema). Where do the other combinations come from?, you ask. Those are pretty much what AMQP does for you. Using toamqp, you can take advantage of these other operation modes for RPC calls.

To say it with the words of the king – A little less conversation, a little more action please:

One to One example

About the simplest thing you can do with toamqp is to connect two points of your infrastructure via RPC. To do this, you need to write a thrift specification of the receiver:


  service receiver {
    /**
     * Let's say we have a search server somewhere, answering with a list
     * of search results to us:
     */
     list<string> search(1: string word)
  }

This is better than the usual getTime RPC call, since we ‘now what time it is’, right? As you can see, thrift has a rich set of types that can be sent over the wire. You can create new types yourself and even do exception handling.

You may be wondering what the ‘1: ’ is about in the list of arguments of our function. That is thrift’s micro-level versioning. You give all your parameters a unique id and have to make sure you only use that id when you’re talking about the exact same thing. I can rename ‘word’, as long as I use the same id; I can also introduce new parameters unilaterally and be assured that only receivers who have use for them will receive them.

Turning the specification in some code

You can now use thrift to generate Ruby code from that. This generally looks like this:


  > thrift --gen rb receiver.thrift

This only works if you install the whole thrift suite, not just the rubygem. Doing so will give you the compiler for the thrift IDL. I may someday include a new rake task generator into toamqp for making this step real easy. For now you are stuck with the command line.

You most likely get a ‘gen-rb’ subdirectory that contains three ruby files. We’re now just one step away from writing our sender/receiver pair. For the request sender (aka client), we write:


  require 'toamqp'    # requires thrift and bunny
  require 'gen-rb/receiver.rb'

  sender = TOAMQP.client('receiver', Receiver)

The first argument to .client is the name of the exchange that you want to use for all messages. Sender and receiver must use the same name here. Think of it as the address of the receiver.

The second argument is the module that contains all thrift generated classes. TOAMQP will dynamically look up the ‘Client’ class from that module and return you an instance of that client, connected to your queue server. Speaking of which: You can configure the connection attributes using the longish method #connection_attributes= on the connection manager:


  TOAMQP.connection_manager.connection_attributes = {
    :user => 'username', 
    :password => 'password', 
    :host => 'myqueue.mydomain.com'
  }

The default is a connect to localhost.

Server implementation

Thrift makes it easy to write a receiver for our service (a simple one at least):


  require 'toamqp'
  require 'gen-rb/receiver.rb'
  
  class ReceiverHandler < TOAMQP::Service::Base
    exchange 'receiver'   # same as we used above
    serves Receiver
    
    def search(query)
      # search implementation with proper exception handling
      return %w{a few search results}
    end
  end
  
  TOAMQP.server(ReceiverHandler.new).   # create the server
    serve                               # enter its main loop

This will create thrift server instance and call its #serve method that will singlethreadedly (is that a word?) serve requests from the sender. So doing


  sender.search('chunky bacon')

will return .. no chunky bacon anymore, think about the serialization issues, but a ruby array of your search results. This is all thanks to thrift - all that toamqp does at this point is connect the thrift endpoints through your message queue.

I have chosen to make the handler implementation lean towards ActiveRecord-style class declarations. This has some disadvantages: You really have to decide what you’re going to serve and how the exchange is going to be named, for all instances of the class. On the plus side, you are free to meddle with your own constructor – and you don’t have to call super, something I always seem to forget.

Note that there is a second argument to #server that allows you to specify which server class to instantiate. You can plug one of the evented or threaded servers that come with thrift in there, although I must admit that my tests are cruelly lacking in that area. Speaking of tests: toamqp also comes with a server called ‘TOAMQP::SpecServer’ that can help you write integration tests against the whole infrastructure. You should really read the specs in the ‘spec/integration’ subdirectory of toamqp to get an example of usage for that.

Extending to round robin load balancing

This is all very plain thrift stuff – nothing you could not also achieve with tcp transports. Let’s take advantage of your message broker (your queue) a little more. One of the things that a broker does as per specification is round-robin balancing of message receivers.

If we connect more than one receiver to the exact same exchange (a simplification of issues here, more down below), they will each get 50% (in the discret sense of the term, not the quantum mechanic sense) of the load. It’s really as simple as that, just run two servers. And nothing stops you from running 10, on different machines. Let me give you a small example of a forking server that starts more than one server process on one machine:


  $pids = []
  2.times do 
    $pids << fork { TOAMQP.server(ReceiverHandler.new).serve }
  end
  
  Signal.trap("CHLD") {
    Process.wait; 
    $pids.delete($?.pid)
  }
  
  sleep 1 until $pids.empty?

Of course, this is a brutal oversimplification of the issues involved. You might want to look at the threaded server that comes with thrift or/and at unicorn, a forking web server for more information about this.

One small warning at this point: Thrift itself is not thread safe, and neither is toamqp. You cannot just spawn a few Ruby threads and use the client in all of them; you might have to create one client object per thread. And you should really look at the servers that come with thrift, they solve many of the issues with threading you might have.

So road-robin balancing and load distribution over several machines is done, what is next?

Routing messages to the right server

Sometimes you don’t just have many servers of the same kind, but many servers with the same interface of different kind. toamqp supports this directly.

Let’s say you have two places to search for, your desk and your wastebasket. (Since we all know that important documents are stored there, right?) So ideally, you would want to search them both:


  desk        = TOAMQP.client('receiver', Receiver, { :location => :desk })
  wastebasket = TOAMQP.client('receiver', Receiver, { :location => :wastebasket })
  
  [desk, wastebasket].map { |sender| sender.search('important document') }

This will fail brutally after experimenting with the code above. The reason is that for this to work, you will need to redeclare the ‘receiver’ exchange on your broker to be of a different type. Before, it was direct (as in ‘send my messages directly to the receiver’), now it should be headers (as in ‘filter by the headers I send’). Luckily toamqp does this for you, provided you clear the old variant from your servers first. You can either do this by restarting your server or by manually issuing a delete call via ‘bunny’.

For really easy experimentation you should just change the exchange name to ‘receiver_filtered’. This way toamqp will declare a new exchange and your AMQP broker will not throw an error.

I’ll just show you how to implement a wastebasket receiver; the desk receiver should follow from that. Note that since they implement the same interface but have different implementations, you will really need two different classes:


  class Wastebasket < TOAMQP::Service::Base
    exchange 'receiver', match => { :location => :wastebasket }
    serves Receiver
    
    def search(query)
      # ...
    end
  end

Combining the layouts

These two layouts (Round-Robin Load Balancing and Header Routing) can of course be combined. Together these two will help with most if not all communication needs you might have.

Possible extensions, Status, Roadmap

The current released version of the library is 0.3.0. You should not be using that, however. If you really want to help me, please check out the repository from my github account and use the 0.3.1-rc-* branch. This will give you the latest fixes and help me discover issues with the 0.3.1 release. Mind you, there should not be too many now – I am using this code in production and have encountered but few surprises.

In about two weeks time, I will be releasing 0.3.1. This marks the end of the big rewrite. Once that is done, I would like to start working towards at least partial compatibility with txAMQP for 0.3.2.

Of course, many things are left to be done. Here’s my current wishlist:

  • A specialized rake task for building thrift files (I have something semi-finished laying around)
  • Wrap the client proxy object in a thread safe way. This could lead to a generic thread-safe-o-matic for client proxies.
  • Do some real testing with multithreaded server. Find the issues and the solutions.
  • Broadcasting to many receivers? Easy to do and would enhance the API for announcing presence, sending heartbeats, etc…

Please get in contact if any questions remain (almost impossible, if you read my drivel until here, you must be the expert by now). I welcome all contributions that are accompanied by specs.