ActiveMQ, Camel and Message Expiry

I am a long-time user of Apache ActiveMQ. It’s served me well, and the tight integration with Apache Camel means I can interface with so many systems with relative ease.

In the past, I used to struggle with the concept of queues versus topics, persistent versus non-persistent messages, messages and expiry times. In some ways, it feels like magic, but it’s deterministic, and we like deterministic, and we also like documentation.

Here’s a bit of a walk-through of each of those concepts in a way that sense to me.

Test Setup

There’s a freshly unpacked ActiveMQ archive already on my machine, and I’m in the ActiveMQ directory itself.

I’ve placed the following in conf/local-configuration.xml. It sets up a Camel context, defines a single bean to connect to the local ActiveMQ service, and three routes.

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">

  <bean id="activemq" class="org.apache.camel.component.activemq6.ActiveMQComponent" >
    <property name="brokerURL" value="vm://localhost"/>
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/spring" id="camel">

    <!-- Send a message every 1s, expire after 10s, persistent -->

    <route>
      <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
      <to uri="activemq:queue:EXPIRE.10S.PERSIST?timeToLive=10000"/>
    </route>

    <!-- Send a message every 1s, expire after 30s, non-persistent -->

    <route>
      <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
      <to uri="activemq:queue:EXPIRE.10S.NONPERSIST?timeToLive=10000&amp;deliveryMode=1"/>
    </route>

    <!-- Send a message every 1s, expire after 15s, priority 1 -->

    <route>
      <from uri="timer://foo?fixedRate=true&amp;period=1000"/>
      <to uri="activemq:queue:EXPIRE.10S.NONPERSIST.PRI1?timeToLive=10000&amp;deliveryMode=1&amp;priority=1"/>
    </route>

  </camelContext>

</beans>

This uses the ‘timer’ component to produce a message every second (1000ms) and route it to a queue with a customisable time-to-live, delivery mode and priority.

I’ve also added the following line in conf/activemq.xml, just after importing jetty.xml:

<import resource="local-configuration.xml"/>

Overview

Looking at the web interface to ActiveMQ at http://localhost:8161/admin/queues.jsp, you’ll see three queues: EXPIRE.10S.NONPERSIST, EXPIRE.10S.NONPERSIST.PRI1 and EXPIRE.10S.PERSIST.

The messages in EXPIRE.10S.PERSIST are marked as persistent, and the messages in the other queues are marked as non-persistent. The messages in EXPIRE.10S.NONPERSIST.PRI1 are priority 1 whereas the others are the default priority of 4.

Persistence

Messages are either persistent or non-persistent. Persistent messages are written to disk and survive a restart, but non-persistent messages aren’t. As a result, if you restart your ActiveMQ server, non-persistent messages will disappear, and persistent messages will persist.

To see this in action, look at the user interface. You’ll see the same number of pending messages in all three queues. Now, restart the server by running ‘bin/activemq restart’.

Take a look again, and you’ll see that there are now more messages in EXPIRE.10S.PERSIST than the non-persistent queues. Why is this? Well, the messages marked as persistent are written to disk and survive a restart, whereas the messages in the queues queues don’t.

You might use persistent messages where you have an event that needs logging, e.g. a door being opened, and you need all the events. You might use non-persistent messages if you have a stream of messages from an electricity meter – it doesn’t matter if you miss a few on restart, because they’ll be generated again in a second.

Expiry

When you have nothing consuming from a queue, persistent and non-persistent messages will build up in the queue. Unless they’re consumed, they’ll sit there forever, consuming disk space and memory.

In each of our three queues, we’ve set an expiry, or time-to-live (that’s ‘live’ as in the opposite of death, rather than ‘live’ as the opposite of recorded) of 10 seconds.

Go back to the ActiveMQ web interface and look at the number of pending messages. This figure will be at least 10, and at most 40 – but why? Let me explain.

It’s computationally expensive to keep scanning queues to see if messages have expired, so ActiveMQ runs a task every 30 seconds to expire messages from queues. When we first start the server, the message count will grow from 0 to 10 every second. At the 11th second, the first message will have expired, and we’ll have placed another message on the queue. This will happen at the 12th through 39th second, at which any expired message will be removed, and we’ll be left with the last 10s of messages.

Dead Letter Queue

If you have ActiveMQ set up to use a dead letter queue (DLQ), then every message that expires will be sent to the DLQ. If you have a process which logs or reprocesses these messages, great. But if you don’t, then they’ll fill up the DLQ and give you the same problems as filling up a queue.

It’s possible to use a plugin in activemq.xml to discard messages sent to the DLQ if you don’t care about them:

<plugins>
  <discardingDLQBrokerPlugin dropAll="true" dropTemporaryQueues="true" dropTemporaryTopics="true"/>
</plugins>

Often, it’s important to check you have nothing on your DLQ because it indicates your clients aren’t consuming messages fast enough, so don’t make this plugin your default!

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.