Posts Tagged JMS

It helps to have a message listener container

I couldn’t figure out why the AOP advice kept not running around the MessageListener’s onMessage() method in my integration test.  I had declared the MessageListener as a bean (the advice only gets applied to beans) and relaxed the pointcut expression so it shouldn’t have to reside in a certain subpackage; the setup looked good, but still no advice.  I ratcheted the logging level up and saw the message being sent, so why wasn’t the advice running?  I began to have that piteous, put-upon feeling you get when you think you’re doing everything right but the program is unreasonably refusing to cooperate.

Then I realized that I hadn’t declared a DefaultMessageListenerContainer in my Spring bean file.  And then I realized that I had only been seeing messages being sent, never received.  (The advice only applies when the message is received).  Oops!

I declared the message listener container bean, and messages began to be received with the advice applying as it should.  : )

, ,

Leave a comment

Applying advice based on type

When we add JMS queue support to our architecture, we’ll want to apply some AOP advice each time a listener receives a message.  We want to have a convention for how the listeners will be set up so that our advice will find them.

In a recent communication with the design leads, I proposed that one of the ways we recognize a MessageListener be by having a convention that its class name must be *MessageListener.

One of the design leads asked a great question: Could we go based on the class implementing the MessageListener interface instead of being named a certain way?  That would be slick, wouldn’t it?  I didn’t know the answer, though.  I only knew about matching based on name of the class/package/method/signature.

Starting

Too much advice

This pointcut expression works, but applies to too much:

execution(void com.ontsys..svc..*.onMessage(..))

We want to modify it to only apply to classes that implement the javax.jms.MessageListener interface.

Trying

A ineffectual plus

At first it looked like the plus sign was what we needed.  The AspectJTM Programming Guide Appendix B. Language Semantics: Type patterns section on Subtype patterns says:

It is possible to pick out all subtypes of a type (or a collection of types) with the “+” wildcard. The “+” wildcard follows immediately a type name pattern.

And the Spring manual contains an example of the + wildcard on an execution pointcut.

My first attempt was a misuse of the plus sign, though I didn’t know it right away:

execution(void com.ontsys..svc..MessageListener+.onMessage(..)

That didn’t work– when I deployed the project, the advice wasn’t applied to my listener. (I could tell because the listener relies on something the AOP advice sets up, and it blew up when that wasn’t there.)

I see now that I was misusing the plus operator — I wanted the advice to apply to any implementor/subclass of javax.jms.MessageListener, but the way I was using it it was as if I was trying to have the advice apply to any implementor/subclass of com.ontsys..svc..MessageListener.

Escaping from the Ampersands

The Spring manual’s AOP pointcut examples section shows an example using “target”:

target(com.xyz.service.AccountService)

and says it matches “any join point (method execution only in Spring AOP) where the target object implements the AccountService interface”

I tried this:

execution(void com.ontsys..svc..*.onMessage(..)) && target(MessageListener)

On deployment, though, I got an error about one of the ampersands…

And…

I modified the pointcut expression to use the word and instead:

execution(void com.ontsys..svc..*.onMessage(..)) and target(MessageListener)

Now I got this error:

warning no match for this type name: MessageListener [Xlint:invalidAbsoluteTypeName]

Success

Noticing that the Spring manual’s AOP pointcut examples all seemed to show fully-qualified types, I thought I’d try fully qualifying the name of the MessageListener interface:

execution(void com.ontsys..svc..*.onMessage(..)) and target(javax.jms.MessageListener)

This one worked. Woohoo!

Maybe I’m on my way to being a fully-qualified pointcut-expression writer.  :D

, ,

Leave a comment

We’ll want to JNDI-lookup the queue, too…

In our system we have proved out looking up the JMS connection factory from JNDI, but up to this point we have declared the Queue bean using an <amq:queue> element in the application bean files.  This ties our application code to the ActiveMQ broker, which we don’t want.  We want to move toward looking up the connection factory and the queue from JNDI.  That way the application bean file only needs to know the JNDI name, and we can configure the queue in the connection factory datasource configuration file on the app server (e.g., activemq-jms-ds.xml), probably as an mbean element in there:


   <mbean code="org.jboss.resource.deployment.AdminObject" name="activemq.queue:name=TheMBeanNameYouSeeInTheJMXConsole">
      <attribute name="JNDIName">this/is/the/jndi/name/the/bean/files/will/use</attribute>
      <depends optional-attribute-name="RARName">jboss.jca:service=RARDeployment,name='activemq-rar-5.1.0.rar'</depends>
      <attribute name="Type">javax.jms.Queue</attribute>
      <attribute name="Properties">PhysicalName=theQueueName.YouWillSee.In.ActiveMQ</attribute>
   </mbean>

But Who Will Care for the Babies?

This will be fine for a production environment — but what about our integration tests?  Right now we use Bitronix Transaction Manager 1.3’s JNDI server facilities to provide JNDI lookup of the JDBC datasource for our integration tests.  I thought we’d also use BTM to look up the JMS connection factory… but BTM does not support JNDI-looking-up queues as well.  Ludovic says:

If you really need a complete JNDI server, you can use Tomcat’s which is quite easy to reuse. This is what I recommended using before BTM 1.3, there is an example here: http://docs.codehaus.org/display/BTM/Hibernate

That’s probably what we’ll need to look into when the time comes…

, , , , ,

Leave a comment

JMX meets the DefaultMessageListenerContainer

(It’s kind of like King Kong vs. Godzilla, only more peaceful)

According to the Javadoc, these properties on the DefaultMessageListenerContainer “can be modified at runtime, for example through JMX” :

concurrentConsumers
maxConcurrentConsumers
maxMessagesPerTask
idleTaskExecutionLimit

Armed with this and my handy Spring manual, I added the following to my beans file:


    <bean id="exporter" class="org.springframework.jmx.export.MBeanExporter" lazy-init="false">
<property name="beans">
            <map>
                <entry key="bean:name=listenerContainer1" value-ref="listenerContainer" />
            </map>
        </property>
    </bean>

(In this beans file I already had a DefaultMessageListenerContainer bean with an id of listenerContainer.)

Now when I fired up my JBoss JMX Management, a new bean category appears:

I can click on that link and see several properties that I can change on the DefaultMessageListenerContainer bean, including ConcurrentConsumers, which is currently 1.  If I change that to 2 and press the Apply Changes button though, I get a stack trace in JBoss:

12:47:17,978 ERROR [[HtmlAdaptor]] Servlet.service() for servlet HtmlAdaptor threw exception java.lang.ClassNotFoundException: org.springframework.jms.support.destination.DestinationResolver

So we’ll have to figure out what that’s about.

, ,

5 Comments

The mysteriously necessary sessionTransacted=true

I’ve encountered this before, but just ran into it again: Despite suggestions to the contrary, it is necessary (at least with Spring 2.5.5, ActiveMQ 5.1.0, and JBoss 4.2.2.GA) to set sessionTransacted=true in the DefaultMessageListenerContainer even when it’s set up to connect to an external JTA transaction manager. If I set sessionTransacted=true, then when my messageListener throws a RuntimeException, the message is rolled back onto the queue; if I don’t, it doesn’t.

This has been documented elsewhere regarding Atomikos, but not ActiveMQ. I’ve posted a question to the Spring Remoting and JMS forum. We’ll see what comes of it!

, , , ,

15 Comments

Pardon me sir, was that an XA transaction that just rolled by?

Last time, we briefly tried local JMS transactions, hopefully on our way to demonstrating a successful XA transaction rollback.  (You know, a successful transaction failure! :)


On a whim, I added an acknowledge="transacted" attribute to my DefaultMessageListenerContainer bean… and now the transaction rolls back when the MessageListener throws a RuntimeException, putting the message back in the queue, whence it is redelivered!  Is this not the moment we have been working for days to witness??!

Maybe!

The working(?) bean file

The complete Spring bean file is:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amq="http://activemq.org/config/1.0" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jms="http://www.springframework.org/schema/jms" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-2.5.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">

    <jee:jndi-lookup id="queueConnectionFactory" jndi-name="java:activemq/QueueConnectionFactory" />

    <bean id="messageListener" class="com.ontsys.dmn.MessageListener" />

    <tx:jta-transaction-manager />

    <jms:listener-container transaction-manager="transactionManager" connection-factory="queueConnectionFactory"
        acknowledge="transacted">
        <jms:listener destination="myTest.Queue" ref="messageListener" />
    </jms:listener-container>

    <tx:advice id="txAdvice" transaction-manager="transactionManager">
        <tx:attributes>
            <tx:method name="*" />
        </tx:attributes>
    </tx:advice>

    <aop:config>
        <aop:pointcut id="messageReceiveOperation" expression="execution(* com.ontsys.dmn.MessageListener.onMessage(..))" />
        <aop:advisor advice-ref="txAdvice" pointcut-ref="messageReceiveOperation" />
    </aop:config>
</beans>

JBoss Server Debug-Level Log

Part of the JBoss debug-level server log follows.  I’ve bolded the lines that look rollbacky and/or XA-like.

2008-07-22 14:48:24,915 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Received message of type [class org.apache.activemq.command.ActiveMQTextMessage] from consumer [ActiveMQMessageConsumer { value=ID:osc8373-4330-1216752471934-0:0:33:1, started=true }] of transactional session [ManagedSessionProxy { ActiveMQSession {id=ID:osc8373-4330-1216752471934-0:0:33,started=true} }]
2008-07-22 14:48:24,915 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Invoking listener with message of type [class org.apache.activemq.command.ActiveMQTextMessage] and session [ManagedSessionProxy { ActiveMQSession {id=ID:osc8373-4330-1216752471934-0:0:33,started=true} }]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Using transaction object [org.springframework.transaction.jta.JtaTransactionObject@628e42]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Participating in existing transaction
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [javax.jms.MessageListener.onMessage]
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [javax.jms.MessageListener.onMessage] after exception: java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Applying rules to determine whether transaction should rollback on java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Winning rollback rule is: null
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] No relevant rollback rule found: applying default rules
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Participating transaction failed – marking existing transaction as rollback-only
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.jta.JtaTransactionManager] Setting JTA transaction rollback-only
2008-07-22 14:48:24,946 DEBUG [org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.jms.connection.JmsResourceHolder@a98e77] for key [org.apache.activemq.ra.ActiveMQConnectionFactory@335297] bound to thread [org.springframework.jms.listener.DefaultMessageListenerContainer#0-1]
2008-07-22 14:48:24,946 DEBUG [org.springframework.jms.listener.DefaultMessageListenerContainer] Rolling back transaction because of listener exception thrown: java.lang.RuntimeException: Boom: boom
2008-07-22 14:48:24,946 WARN  [org.springframework.jms.listener.DefaultMessageListenerContainer] Execution of JMS message listener failed
java.lang.RuntimeException: Boom: boom

What exactly was it that rolled back?

Did this roll back an XA transaction?  Or did I unwittingly switch over to JMS local transactions by setting acknowledge="transacted"? The server log output certainly seems to confirm that it was an out-and-out XA transaction that we rolled back.  We can further confirm this by adding a JDBC/Hibernate operation to the transaction and see if a rollback really rolls back both the JMS and Hibernate portions.

Thanks to Murali Kosaraju’s JavaWorld article for the idea of setting sessionTransacted on the DMLC, even in an XA situation!

Next time, we’ll try to get the JDBC datasource in place.

, , ,

Leave a comment

Why there were two consumers listening

I found out why my .war project had two consumers listening to the queue: My application logic was allowing the line

ctx = new ClassPathXmlApplicationContext("applicationContext.xml");

to be called twice, which instantiated the listener the second time. I fixed the guard condition, and now when I deploy I only have one consumer listening to my queue, as I expected should be the case!

(See my self-answering thread on the Spring forum.)

,

Leave a comment