It helps to have a message listener container

I couldn’t figure out why the AOP advice kept not running around the MessageListener’s onMessage() method in my integration test.  I had declared the MessageListener as a bean (the advice only gets applied to beans) and relaxed the pointcut expression so it shouldn’t have to reside in a certain subpackage; the setup looked good, but still no advice.  I ratcheted the logging level up and saw the message being sent, so why wasn’t the advice running?  I began to have that piteous, put-upon feeling you get when you think you’re doing everything right but the program is unreasonably refusing to cooperate.

Then I realized that I hadn’t declared a DefaultMessageListenerContainer in my Spring bean file.  And then I realized that I had only been seeing messages being sent, never received.  (The advice only applies when the message is received).  Oops!

I declared the message listener container bean, and messages began to be received with the advice applying as it should.  : )

Advertisements

Applying advice based on type

When we add JMS queue support to our architecture, we’ll want to apply some AOP advice each time a listener receives a message.  We want to have a convention for how the listeners will be set up so that our advice will find them.

In a recent communication with the design leads, I proposed that one of the ways we recognize a MessageListener be by having a convention that its class name must be *MessageListener.

One of the design leads asked a great question: Could we go based on the class implementing the MessageListener interface instead of being named a certain way?  That would be slick, wouldn’t it?  I didn’t know the answer, though.  I only knew about matching based on name of the class/package/method/signature.

Starting

Too much advice

This pointcut expression works, but applies to too much:

execution(void com.ontsys..svc..*.onMessage(..))

We want to modify it to only apply to classes that implement the javax.jms.MessageListener interface.

Trying

A ineffectual plus

At first it looked like the plus sign was what we needed.  The AspectJTM Programming Guide Appendix B. Language Semantics: Type patterns section on Subtype patterns says:

It is possible to pick out all subtypes of a type (or a collection of types) with the “+” wildcard. The “+” wildcard follows immediately a type name pattern.

And the Spring manual contains an example of the + wildcard on an execution pointcut.

My first attempt was a misuse of the plus sign, though I didn’t know it right away:

execution(void com.ontsys..svc..MessageListener+.onMessage(..)

That didn’t work– when I deployed the project, the advice wasn’t applied to my listener. (I could tell because the listener relies on something the AOP advice sets up, and it blew up when that wasn’t there.)

I see now that I was misusing the plus operator — I wanted the advice to apply to any implementor/subclass of javax.jms.MessageListener, but the way I was using it it was as if I was trying to have the advice apply to any implementor/subclass of com.ontsys..svc..MessageListener.

Escaping from the Ampersands

The Spring manual’s AOP pointcut examples section shows an example using “target”:

target(com.xyz.service.AccountService)

and says it matches “any join point (method execution only in Spring AOP) where the target object implements the AccountService interface”

I tried this:

execution(void com.ontsys..svc..*.onMessage(..)) && target(MessageListener)

On deployment, though, I got an error about one of the ampersands…

And…

I modified the pointcut expression to use the word and instead:

execution(void com.ontsys..svc..*.onMessage(..)) and target(MessageListener)

Now I got this error:

warning no match for this type name: MessageListener [Xlint:invalidAbsoluteTypeName]

Success

Noticing that the Spring manual’s AOP pointcut examples all seemed to show fully-qualified types, I thought I’d try fully qualifying the name of the MessageListener interface:

execution(void com.ontsys..svc..*.onMessage(..)) and target(javax.jms.MessageListener)

This one worked. Woohoo!

Maybe I’m on my way to being a fully-qualified pointcut-expression writer.  :D

We’ll want to JNDI-lookup the queue, too…

In our system we have proved out looking up the JMS connection factory from JNDI, but up to this point we have declared the Queue bean using an <amq:queue> element in the application bean files.  This ties our application code to the ActiveMQ broker, which we don’t want.  We want to move toward looking up the connection factory and the queue from JNDI.  That way the application bean file only needs to know the JNDI name, and we can configure the queue in the connection factory datasource configuration file on the app server (e.g., activemq-jms-ds.xml), probably as an mbean element in there:


   <mbean code="org.jboss.resource.deployment.AdminObject" name="activemq.queue:name=TheMBeanNameYouSeeInTheJMXConsole">
      <attribute name="JNDIName">this/is/the/jndi/name/the/bean/files/will/use</attribute>
      <depends optional-attribute-name="RARName">jboss.jca:service=RARDeployment,name='activemq-rar-5.1.0.rar'</depends>
      <attribute name="Type">javax.jms.Queue</attribute>
      <attribute name="Properties">PhysicalName=theQueueName.YouWillSee.In.ActiveMQ</attribute>
   </mbean>

But Who Will Care for the Babies?

This will be fine for a production environment — but what about our integration tests?  Right now we use Bitronix Transaction Manager 1.3’s JNDI server facilities to provide JNDI lookup of the JDBC datasource for our integration tests.  I thought we’d also use BTM to look up the JMS connection factory… but BTM does not support JNDI-looking-up queues as well.  Ludovic says:

If you really need a complete JNDI server, you can use Tomcat’s which is quite easy to reuse. This is what I recommended using before BTM 1.3, there is an example here: http://docs.codehaus.org/display/BTM/Hibernate

That’s probably what we’ll need to look into when the time comes…

JMX meets the DefaultMessageListenerContainer

(It’s kind of like King Kong vs. Godzilla, only more peaceful)

According to the Javadoc, these properties on the DefaultMessageListenerContainer “can be modified at runtime, for example through JMX” :

concurrentConsumers
maxConcurrentConsumers
maxMessagesPerTask
idleTaskExecutionLimit

Armed with this and my handy Spring manual, I added the following to my beans file:


    <bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" lazy-init="false">
<property name="beans">
            <map>
                <entry key="bean:name=listenerContainer1" value-ref="listenerContainer" />
            </map>
        </property>
    </bean>

(In this beans file I already had a DefaultMessageListenerContainer bean with an id of listenerContainer.)

Now when I fired up my JBoss JMX Management, a new bean category appears:

I can click on that link and see several properties that I can change on the DefaultMessageListenerContainer bean, including ConcurrentConsumers, which is currently 1.  If I change that to 2 and press the Apply Changes button though, I get a stack trace in JBoss:

12:47:17,978 ERROR [[HtmlAdaptor]] Servlet.service() for servlet HtmlAdaptor threw exception java.lang.ClassNotFoundException: org.springframework.jms.support.destination.DestinationResolver

So we’ll have to figure out what that’s about.

The mysteriously necessary sessionTransacted=true

I’ve encountered this before, but just ran into it again: Despite suggestions to the contrary, it is necessary (at least with Spring 2.5.5, ActiveMQ 5.1.0, and JBoss 4.2.2.GA) to set sessionTransacted=true in the DefaultMessageListenerContainer even when it’s set up to connect to an external JTA transaction manager. If I set sessionTransacted=true, then when my messageListener throws a RuntimeException, the message is rolled back onto the queue; if I don’t, it doesn’t.

This has been documented elsewhere regarding Atomikos, but not ActiveMQ. I’ve posted a question to the Spring Remoting and JMS forum. We’ll see what comes of it!

Pardon me sir, was that an XA transaction that just rolled by?

Last time, we briefly tried local JMS transactions, hopefully on our way to demonstrating a successful XA transaction rollback.  (You know, a successful transaction failure! :)


On a whim, I added an acknowledge="transacted" attribute to my DefaultMessageListenerContainer bean… and now the transaction rolls back when the MessageListener throws a RuntimeException, putting the message back in the queue, whence it is redelivered!  Is this not the moment we have been working for days to witness??!

Maybe!

The working(?) bean file

The complete Spring bean file is:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.org/config/1.0" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <jee:jndi-lookup id="queueConnectionFactory" jndi-name="java:activemq/QueueConnectionFactory" />

    <bean id="messageListener" class="com.ontsys.dmn.MessageListener" />

    <tx:jta-transaction-manager />

    <jms:listener-container transaction-manager="transactionManager" connection-factory="queueConnectionFactory"
        acknowledge="transacted">
        <jms:listener destination="myTest.Queue" ref="messageListener" />
    </jms:listener-container>

    <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="*" />
        </tx:attributes>
    </tx:advice>

    <aop:config>
        <aop:pointcut id="messageReceiveOperation" expression="execution(* com.ontsys.dmn.MessageListener.onMessage(..))" />
        <aop:advisor advice-ref="txAdvice" pointcut-ref="messageReceiveOperation" />
    </aop:config>
</beans>

JBoss Server Debug-Level Log

Part of the JBoss debug-level server log follows.  I’ve bolded the lines that look rollbacky and/or XA-like.

2008-07-22 14:48:24,915 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] from consumer [ActiveMQMessageConsumer { value=ID:osc8373-4330-1216752471934-0:0:33:1, started=true }] of transactional session [ManagedSessionProxy { ActiveMQSession {id=ID:osc8373-4330-1216752471934-0:0:33,started=true} }]
2008-07-22 14:48:24,915 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Invoking listener with message of type [class org.apache.activemq.command.ActiveMQTextMessage] and session [ManagedSessionProxy { ActiveMQSession {id=ID:osc8373-4330-1216752471934-0:0:33,started=true} }]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Using transaction object [org.springframework.transaction.jta.JtaTransactionObject@628e42]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Participating in existing transaction
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [javax.jms.MessageListener.onMessage]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [javax.jms.MessageListener.onMessage] after exception: java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Applying rules to determine whether transaction should rollback on java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Winning rollback rule is: null
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] No relevant rollback rule found: applying default rules
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Participating transaction failed – marking existing transaction as rollback-only
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Setting JTA transaction rollback-only
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.jms.connection.JmsResourceHolder@a98e77] for key [org.apache.activemq.ra.ActiveMQConnectionFactory@335297] bound to thread [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1]
2008-07-22 14:48:24,946 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Rolling back transaction because of listener exception thrown: java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 WARN  [org.springframework.jms.listener.DefaultMessageListenerContainer] Execution of JMS message listener failed
java.lang.RuntimeException: Boom: boom

What exactly was it that rolled back?

Did this roll back an XA transaction?  Or did I unwittingly switch over to JMS local transactions by setting acknowledge="transacted"? The server log output certainly seems to confirm that it was an out-and-out XA transaction that we rolled back.  We can further confirm this by adding a JDBC/Hibernate operation to the transaction and see if a rollback really rolls back both the JMS and Hibernate portions.

Thanks to Murali Kosaraju’s JavaWorld article for the idea of setting sessionTransacted on the DMLC, even in an XA situation!

Next time, we’ll try to get the JDBC datasource in place.

Why there were two consumers listening

I found out why my .war project had two consumers listening to the queue: My application logic was allowing the line

ctx = new ClassPathXmlApplicationContext("applicationContext.xml");

to be called twice, which instantiated the listener the second time. I fixed the guard condition, and now when I deploy I only have one consumer listening to my queue, as I expected should be the case!

(See my self-answering thread on the Spring forum.)

Simple JMS transaction rollbacks work…

Last time, I thought I had AOP transaction advice in place, but I still was not seeing the rollbacks I thought I should. This time, we try simple JMS local transactions to see if they work, to have another data point…


If I change my DefaultMessageListenerContainer bean to remove the external JTA transaction manager reference and replace it with native JMS transactions like so:

<jms:listener-container acknowledge=”transacted” connection-factory=”queueConnectionFactory”>
<jms:listener destination=”gloriousTest.Queue” ref=”messageListener” />
</jms:listener-container>

…then I can throw an unchecked exception and it will rollback the receive of the message.  It then receives several times, until it goes to the dead letter queue.  However, it doesn’t seem to pay attention to my setting that is supposed to only roll back my SpecialRuntimeExceptions, not all RuntimeExceptions (unless I need a no-rollback-for attribute in my tx:advice):

<tx:advice id=”txAdvice” transaction-manager=”transactionManager”>
<tx:attributes>
<tx:method name=”*” rollback-for=”SpecialRuntimeException” />
</tx:attributes>
</tx:advice>

Update:

Back to a JTA transaction manager, adding the propagation=”REQUIRES_NEW” attribute didn’t help either (still no rollback when my messagelistener throws a SpecialRuntimeException):

<tx:advice id=”txAdvice” transaction-manager=”transactionManager”>
<tx:attributes>
<tx:method name=”*” rollback-for=”SpecialRuntimeException” propagation=”REQUIRES_NEW”/>
</tx:attributes>
</tx:advice>

Guess we’ll keep trying next time

The jms:listener element: hardcoded queue name

I thought I’d try using Spring’s jms: namespace to define my DefaultMessageListener container.  So I changed this:

<bean id=”jmsContainer” class=”org.springframework.jms.listener.DefaultMessageListenerContainer”>
<property name=”connectionFactory” ref=”queueConnectionFactory” />
<property name=”destination” ref=”queue” />
<property name=”messageListener” ref=”messageListener” />
<property name=”transactionManager” ref=”transactionManager” />
</bean>

to this:

<jms:listener-container connection-factory=”queueConnectionFactory” transaction-manager=”transactionManager”>

<jms:listener destination=”queue” ref=”messageListener” />

</jms:listener-container>

(the queue bean was defined this way:

<amq:queue id=”queue” physicalName=”gloriousTest.Queue” />
)

But now when I deployed my .war project, ActiveMQ had a queue named “queue”, and no consumers were listening to the gloriousTest.Queue queue.

As it turns out, the jms:listener element’s destination attribute refers to a physical queue name — it’s not a bean reference.  So I guess if I were going to use the jms:listener element I would need to hardcode gloriousTest.Queue there.  I’m not ready to do that at this point, so I think I’m going to revert back to the plain old <bean> element usage for my DefaultMessageListenerContainer.

Something works!

After days of untested theories, head-scratching, dead ends, and rabbit trails, I’m happy to report that something now works.

The Beginning of Unstuckitude

First thing was, Keith got back, took a look at the class cast errors in the stack trace and said something along the lines of, “That looks like a dependency issue.”

And that is what it was: my .war project was including javax.jms.ConnectionFactory more than once. (There is also a Spring forum question and answer for this problem.)

Step 1: activemq-all to activemq-core

We browsed through the .jar files in the activemq-all .jar file (manually, by opening each enclosing .jar in WinZip — anyone have a better way of doing this?) and indeed it did have javax.jms.* classes in it.  Keith knew that having the same classes multiple times in your .war can cause problems as the app server may load the wrong ones, so in my project’s pom.xml we replaced the dependency on activemq-all with a dependency on activemq-core instead:


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.0.0</version>
</dependency>

When we tried to do a Maven Install of this, though, we got:


Missing:
----------
1) org.apache.activemq:activeio-core:jar:3.1-SNAPSHOT
...
2) org.apache.activemq:activeio-core:test-jar:tests:3.1-SNAPSHOT
...
----------
2 required artifacts are missing.

Step 2: Exclude activeio-core dependency

We don’t need the activeio-core tests anyway, so we put in an exclusion for this:


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.0.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activeio-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

This time Maven Install ran cleanly.

Step 3: Scope = Provided

When we deployed, though, we got errors like this:

10:12:59,417 WARN  [JBossManagedConnectionPool] Throwable while attempting to get a new connection: null
javax.resource.ResourceException: Could not create connection.
...
Caused by: javax.jms.JMSException: Could not create Transport. Reason: java.io.IOException: Transport scheme NOT recognized: [tcp]
...
08:35:36,453 INFO  [DefaultMessageListenerContainer] Could not refresh JMS Connection - retrying in 5000 ms
javax.jms.JMSException: Could not create Transport. Reason: java.io.IOException: Transport scheme NOT recognized: [tcp]
...

The DefaultMessageListenerContainer, finding itself unable to make a JMS connection, kept trying every five seconds —  boom, bang, bang, boom, zowie!  (I’m still getting acclimated to the spectacular errors you can get when deploying to an app server. :)

At first I didn’t notice one of the Caused Bys farther down, which said:

Caused by: java.lang.ClassCastException: org.apache.activemq.transport.tcp.TcpTransportFactory

But sure enough, it turns out that this error was caused by double-deployed classes.  This time, it was the activemq-core jar that is already contained inside the activemq-ra.rar file. Our project already depends on that .rar file being there, and we don’t want two copies of the jar, so we updated our pom file’s activemq-core dependency definition by setting scope to provided (line 5), as follows:

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.0.0</version>
    <scope>provided</scope>
    <exclusions>
        <exclusion>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activeio-core</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Success

This time, our .war file deploys without stack traces (woohoo!),  and when I point my browser to the ActiveMQ Console I see that my “gloriousTest.Queue” queue shows up.  I use the ActiveMQ Console to send a message to the queue and… my MessageListener gets the message and prints its debug output to the JBoss console!

The Files That Worked

applicationContext.xml, the Spring bean file

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:amq="http://activemq.org/config/1.0"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
        http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd">

    <jee:jndi-lookup id="queueConnectionFactory" jndi-name="java:activemq/QueueConnectionFactory" />

    <amq:queue id="queue" physicalName="gloriousTest.Queue" />

    <bean id="messageListener" class="com.something.dmn.MessageListener" />

    <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager" />

    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="queueConnectionFactory" />
<property name="destination" ref="queue" />
<property name="messageListener" ref="messageListener" />
<property name="transactionManager" ref="transactionManager" />
    </bean>
</beans>

pom.xml (relevant portions)

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>${springFramework.version}</version>
        </dependency>
        ...
        <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-jms_1.1_spec</artifactId>
            <version>1.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.0.0</version>
        <scope>provided</scope>
        <exclusions>
            <exclusion>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activeio-core</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>
    ...
</dependencies>
<properties>
    ...
    <springFramework.version>2.5.4</springFramework.version>
</properties>

activemq-jms-ds.xml

This file I was able to use without modification from the example on the ActiveMQ site.  I noticed, though, that the <ServerUrl> elements under connection-factories -> tx-connection-factory seem to be ignored.  (Maybe that is because in our case the ServerUrl is already being pre-specified in the activemq-ra.rar’s ra.xml file.)

ra.xml (in activemq-ra.rar)

I think the only change I had to make to this resource adapter archive was to the connector -> resourceadapter -> config-property-value element for the ServerUrl config-property-name, to change it from vm://localhost to tcp://localhost:61616.  (Or maybe it was already tcp… I don’t remember now.)

Next

Now that we have the JCA/JMS/JNDI/JBoss/Spring/ActiveMQ stuff playing nicely together, we can try to actually do some XA stuff!