Francesco
Francesco

Reputation: 1792

Integrating ActiveMQ and Wildfly with Apache Camel

I'm trying to make Wildfly and ActiveMQ working together with Apache Camel, let me explain the scenario. Every hour a camel batch poll an FTP server, grab the files and send them to an ActiveMQ broker. The broker implements two routes, numbers and big.numbers. Messages are queued into numbers, if they are not ready to be send they are routed to big.numbers. Messages in big.numbers are dequeued and translated by camel processor and queued to numbers as ready to be send. The ready to be send messages in numbers are sent to Wildfly where a MDB do something. Everything works fine except for the last step. When MDB receive a message throws an Exception: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage. Here is the camel.xml I use to implement routes on my ActiveMQ broker:

...
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
    <package>edu.foo.amq.camel</package>
    <dataFormats>
        <string id="utf-8-string" charset="UTF-8"/>
    </dataFormats>


    <route id="toBigNumeri">
        <from uri="activemq:numbers.queue"/>
        <marshal ref="utf-8-string"/>
        <choice>
            <when>
                <simple>
                    ${in.header.readyToGo} != true
                </simple>
                <process ref="big.numbers.processor"/>
                <to uri="activemq:big.numbers.queue"/>
            </when>
            <otherwise>
                <process ref="done.processor"/>
                <to uri="wildflycf:generatoreQueue?username=dummyusr&amp;password=dummy1234"/>
            </otherwise>
        </choice>           
    </route>

    <route id="toNumeri">
        <from uri="activemq:big.numbers.queue"/>
        <marshal ref="utf-8-string"/>
        <split>
            <tokenize token="\n"/>
            <process ref="numbers.processor"/>
            <setHeader headerName="readyToGo">
                <constant>true</constant>
            </setHeader>
            <to uri="activemq:numbers.queue"/>
        </split>
        <process ref="dump.processor"/>
        <to uri="activemq:dump.queue"/>
    </route>
</camelContext>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="connectionFactory">
      <bean class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616"/>
        <property name="userName" value="${activemq.username}"/>
        <property name="password" value="${activemq.password}"/>
      </bean>
    </property>
</bean>

<bean id="jndiTmp" class="org.springframework.jndi.JndiTemplate">
    <property name="environment">
        <props>
            <prop key="java.naming.provider.url">http-remoting://localhost:8080</prop>
            <prop key="java.naming.factory.initial">org.jboss.naming.remote.client.InitialContextFactory</prop>
            <prop key="java.naming.security.principal">jhon</prop>
            <prop key="java.naming.security.credentials">doe</prop>
        </props>
    </property>
</bean>

<bean id="wfcf" class="org.springframework.jndi.JndiObjectFactoryBean">
    <property name="jndiTemplate" ref="jndiTmp"/>
    <property name="jndiName" value="jms/RemoteConnectionFactory"/>
</bean>

<bean id="wildflycf" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="wfcf"/>
</bean>
...

Here is my MDB:

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;

@MessageDriven(name="GeneratoreMDB", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"), 
@ActivationConfigProperty(propertyName="destinationLookup", propertyValue=QueueHandler.jmsQueue)}
)
public class QueueHandler implements MessageListener {

public static final String jmsQueue = "/export/jms/generatoreQueue";

private static final Logger log = Logger.getLogger(QueueHandler.class);

  @Override
  public void onMessage(Message arg0) {
    try {
        log.info(((TextMessage) arg0).getText());
        ...
    } catch (JMSException e) {
        log.error(e);
    }
  }
}

Here is the exception my MDB throws:

2014-12-05 09:12:04,864 ERROR [org.hornetq.ra] (Thread-35 (HornetQ-client-global-threads-1727074612)) HQ154004: Failed to deliver message: javax.ejb.EJBTransactionRolledbackException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at org.jboss.as.ejb3.tx.CMTTxInterceptor.handleInCallerTx(CMTTxInterceptor.java:163)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:253)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.required(CMTTxInterceptor.java:342)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.processInvocation(CMTTxInterceptor.java:239)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.WaitTimeInterceptor.processInvocation(WaitTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.security.SecurityContextInterceptor.processInvocation(SecurityContextInterceptor.java:95)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.ShutDownInterceptorFactory$1.processInvocation(ShutDownInterceptorFactory.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.LoggingInterceptor.processInvocation(LoggingInterceptor.java:59)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.interceptors.AdditionalSetupInterceptor.processInvocation(AdditionalSetupInterceptor.java:55)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.messagedriven.MessageDrivenComponentDescription$5$1.processInvocation(MessageDrivenComponentDescription.java:211)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.wildfly.security.manager.WildFlySecurityManager.doChecked(WildFlySecurityManager.java:448)
at org.jboss.invocation.AccessCheckingInterceptor.processInvocation(AccessCheckingInterceptor.java:61)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:326)
at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ViewService$View.invoke(ViewService.java:185)
at org.jboss.as.ee.component.ViewDescription$1.processInvocation(ViewDescription.java:182)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.ProxyInvocationHandler.invoke(ProxyInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$view3.onMessage(Unknown Source)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:139)
at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73)
at edu.foo.incassionline.generatore.jms.QueueHandler$$$endpoint1.onMessage(Unknown Source)
at org.hornetq.ra.inflow.HornetQMessageHandler.onMessage(HornetQMessageHandler.java:319) [hornetq-ra-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1116) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl.access$500(ClientConsumerImpl.java:56) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1251) [hornetq-core-client-2.4.1.Final.jar:]
at org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor$1.run(OrderedExecutorFactory.java:104) [hornetq-core-client-2.4.1.Final.jar:]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [rt.jar:1.7.0_45]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [rt.jar:1.7.0_45]
at java.lang.Thread.run(Thread.java:744) [rt.jar:1.7.0_45]
Caused by: java.lang.ClassCastException: org.hornetq.jms.client.HornetQBytesMessage cannot be cast to javax.jms.TextMessage
at edu.foo.incassionline.generatore.jms.QueueHandler.onMessage(QueueHandler.java:27)
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) [:1.7.0_45]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.7.0_45]
at java.lang.reflect.Method.invoke(Method.java:606) [rt.jar:1.7.0_45]
at org.jboss.as.ee.component.ManagedReferenceMethodInterceptor.processInvocation(ManagedReferenceMethodInterceptor.java:52)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.doMethodInterception(Jsr299BindingsInterceptor.java:82)
at org.jboss.as.weld.ejb.Jsr299BindingsInterceptor.processInvocation(Jsr299BindingsInterceptor.java:93)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.WeavedInterceptor.processInvocation(WeavedInterceptor.java:53)
at org.jboss.as.ee.component.interceptors.UserInterceptorFactory$1.processInvocation(UserInterceptorFactory.java:63)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.invocationmetrics.ExecutionTimeInterceptor.processInvocation(ExecutionTimeInterceptor.java:43)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InterceptorContext$Invocation.proceed(InterceptorContext.java:407)
at org.jboss.weld.ejb.AbstractEJBRequestScopeActivationInterceptor.aroundInvoke(AbstractEJBRequestScopeActivationInterceptor.java:55)
at org.jboss.as.weld.ejb.EjbRequestScopeActivationInterceptor.processInvocation(EjbRequestScopeActivationInterceptor.java:83)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.InitialInterceptor.processInvocation(InitialInterceptor.java:21)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
at org.jboss.as.ee.component.interceptors.ComponentDispatcherInterceptor.processInvocation(ComponentDispatcherInterceptor.java:53)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.component.pool.PooledInstanceInterceptor.processInvocation(PooledInstanceInterceptor.java:51)
at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:309)
at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInCallerTx(CMTTxInterceptor.java:251)
... 49 more

I can't get why the MDB gets the message as HornetQ message and not a simple JMS Message.

Upvotes: 0

Views: 1037

Answers (1)

Peter Keller
Peter Keller

Reputation: 7636

When sending a JMS message, Camel converts the message body to the following JMS message types (see here):

╔══════════════════════╦═════════════════════════╗
║ Body Type            ║ JMS Message             ║
╠══════════════════════╬═════════════════════════╣
║ String               ║ javax.jms.TextMessage   ║
║ org.w3c.dom.Node     ║ javax.jms.TextMessage   ║
║ Map                  ║ javax.jms.MapMessage    ║
║ java.io.Serializable ║ javax.jms.ObjectMessage ║
║ byte[]               ║ javax.jms.BytesMessage  ║
║ java.io.File         ║ javax.jms.BytesMessage  ║
║ java.io.Reader       ║ javax.jms.BytesMessage  ║
║ java.io.InputStream  ║ javax.jms.BytesMessage  ║
║ java.nio.ByteBuffer  ║ javax.jms.BytesMessage  ║
╚══════════════════════╩═════════════════════════╝

Please, check the type of the message body in big.numbers.processor. There are two possibilities: 1) create a "text message type body" according Camel's convertion rules and cast to TextMessage, or 2) cast to BytesMessage instead.

Upvotes: 1

Related Questions