The Queue Behind the Curtain

I will try to keep this short, sweet, and to the point. There’s a lot that goes on behind the pretty face that is A web of systems work together to gather, enrich and store gobs of data quickly for our users. Messages are passed between these systems to coordinate work. This entry will focus on my recent experience with designing for high availability with Apache’s ActiveMQ.

After successfully setting up two instances of the ActiveMQ server pointing towards a MySQL database sitting on a separate external server (which was a difficult task in itself), the need arose for us to test the failover/redundancy of the entire system. For those not familiar with ActiveMQ, there is a specific nomenclature used as there is with many CS related topics. Here are a couple basic terms:

  • Producer – Generates the jobs that are submitted to the ActiveMQ server.
  • Listener – Listens for jobs on the ActiveMQ server and consumes them.

The Code

My goal was the create a producer that sent unique jobs every n seconds while a simple Listener listened for these jobs and printed debugging information. Here’s how I set this up. First I created a simple producer in python that attempted to connect to one of the ActiveMQ servers; if that failed, it attempted to connect to the second ActiveMQ server; if neither time was successful, it waited for a period and then retried.

SERVER1 = 'X.X.X.X' SERVER2 = 'X.X.X.X' PORT = XXXXX client = PublishClient(SERVER1, ACTIVEMQ_PORT) 
ip_dest = '' activemq_ip = SERVER1 

try: response = client.connect(USER, PASSWORD) activemq_ip = SERVER1 except Exception as e: client = PublishClient(SERVER2, PORT) 

try: response = client.connect(USER, PASSWORD) activemq_ip = SERVER2 except Exception as e: if (mq_params['count'] == 0): 
print "Could not connect to activemq." time.sleep(3) mq_params['count'] += 1 self.send(mq_params) 

Perhaps not the cleanest python script, but for testing purposes it did the trick. On the other side of the queue, I created a Java listener to consume the jobs and log debugging information. Here’s a snippet of the Listener, particularly the overridden JMS ‘OnMessage()’ method in my FailoverListener class:

@Override public void onMessage(Message msg) { 
    if (msg instanceof BytesMessage) { 
        BytesMessage bMsg = (BytesMessage) msg; 
	StringBuilder buffer = new StringBuilder(); 

	try { 
	    for (int i = 0; i < (int)bMsg.getBodyLength(); i++) { 
	        buffer.append((char) bMsg.readByte()); 
	} catch (JMSException e) { 
	    System.out.println("Failed to convert byte msg to string"); 

	String text = buffer.toString().trim(); 
	FailoverPojo pojo = new FailoverPojo(); 
	Gson gson = new Gson(); 
	pojo = gson.fromJson(text, FailoverPojo.class); 
	System.out.println("Received message: " + pojo.asJson()); // Must call acknowledge on Message object specifically. 

	try { 
	} catch (JMSException e) { 

That ensures that the Java listener will dynamically failover if the current server it's listening to goes down.

Now, here's the fun part. Hop on to each of your ActiveMQ servers using ssh and start each instance. Whichever grabs the database lock first will be the actual server in use. Start your Java listener and make sure that it connects to your ActiveMQ server. Next, run the producer that will send jobs to the ActiveMQ server every n seconds (specified in the script). Lastly, kill the currently running ActiveMQ server and watch the other server grab the lock and pick up where the previous server left off without a hitch.

Here are some screenshots showing the live failover. The first is the python producer sending jobs to the queue and the second is the Java listener consuming said jobs:

Sprout Queue Code

It's possible that one of your jobs will fail during the downtime, depending on your retry threshold (the amount of time that your python script sleeps before trying to resend). You will see the job being retried and successfully submitted to the new ActiveMQ server. On the other side, your listener will have caught the switch and seamlessly grabbed the job from the new server.

That's it! Try experimenting with how low you can set the "retry threshold". I was able to get it down to roughly four seconds before the job fails.

- Rob (@rmadd3n)