Reputation: 583
Hello!
I am working on a big ActiveMQ message hub. Many applications sends messages through our hub to various ActiveMQ clusters and it is our job to route those messages (just like a postal service). We want to utilize Camel to route messages from one ActiveMQ cluster to another. Part of the work we want Camel to do is to compress the messages between the clusters.
However, most messages goes through as ObjectMessages and we have no access to the jar files (more exact: no intrest in the object files nor the data, we just want to compress the data and move it to another broker). Just as the Postal service, we are not interested nor allowed to read the messages, just pass them along.
When I try to read from one ActiveMQ broker, compress a message and the send it to another broker, I will get a ClassCastException that my serializable class does not exists in classpath.
I don't want Camel/ActiveMQ to serialize the object back to a Java object, instead I want it just to read the message as bytes and compress those. Is it possible for Camel to read from ActiveMQ/JMS and think it just is a blob of bytes/stream/gobligook?
The closest I got is to specify jmsMessageType=Bytes as seen below, but it doesn't work. I have tried to find the answer on http://camel.apache.org/jms but I can't find a solution.
I have tried to enqueue a TextMessage and it works perfectly. However, this must work for every JMS message type, including ObjectMessages.
I am using Redhat A-MQ 6.3, Camel 2.17.0 sitting inside Karaf 2.4.0, ActiveMQ 5.11.0.
You will get "extra points" (or atleast I will be very happy) if you could help me by providing XML DSL example versions/code snippets and not Java DSL (I am very new to Camel and have not gotten used to translate between those), but any help at all is received with open arms!
Many thanks for any ideas or solutions you might provide!
My camel file is
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsone" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean>
<bean id="jmstwo" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://numbertwo:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<dataFormats>
<gzip id="compressor"/>
</dataFormats>
<route id="queueToOtherBroker">
<from uri="jmsone:andersPanders?jmsMessageType=Bytes" />
<marshal ref="compressor" />
<to uri="jmstwo:olleBandola" />
</route>
</camelContext>
</beans>
Addresses numberone and numbertwo are Docker containers, connectivity is not an issue and they can speak to each other.
Here is the full exception
2017-01-27 14:54:43,003 | WARN | r[andersPanders] | EndpointMessageListener | rg.apache.camel.util.CamelLogger 213 | 192 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - Failed to extract body due to: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]. Message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1, destination = queue://andersPanders, transactionId = null, expiration = 0, timestamp = 1485528882925, arrival = 0, brokerInTime = 1485528882928, brokerOutTime = 1485528882944, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@57958b, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1109, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}]
org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]. Message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1, destination = queue://andersPanders, transactionId = null, expiration = 0, timestamp = 1485528882925, arrival = 0, brokerInTime = 1485528882928, brokerOutTime = 1485528882944, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@57958b, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1109, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:160)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:236)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:47)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:90)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:72)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:683)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:651)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:628)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:144)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:91)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:555)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:515)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:485)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:208)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:135)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
... 23 more
Caused by: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]
at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1556)[org.apache.felix.framework-4.4.1.jar:]
at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:77)[org.apache.felix.framework-4.4.1.jar:]
at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1993)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)[:1.8.0_121]
at java.lang.Class.forName0(Native Method)[:1.8.0_121]
at java.lang.Class.forName(Class.java:348)[:1.8.0_121]
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.load(ClassLoadingAwareObjectInputStream.java:140)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:55)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)[:1.8.0_121]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)[:1.8.0_121]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)[:1.8.0_121]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)[:1.8.0_121]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)[:1.8.0_121]
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
... 24 more
Anders being just a class I made to test compression.
My test program in groovy:
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.*
class Anders implements Serializable {
String name = "Anders Andersson"
int age=120
}
def brokerUrl = 'tcp://localhost:61616'
def queue = 'andersPanders'
def reader = new BufferedReader(new InputStreamReader(System.in))
new ActiveMQConnectionFactory(brokerURL: brokerUrl, userName: "admin", password: "admin").createConnection().with {
start()
createSession(false, Session.AUTO_ACKNOWLEDGE).with {
def message = createObjectMessage(new Anders())
createProducer().send(createQueue(queue), message)
}
close()
}
Upvotes: 1
Views: 1302
Reputation: 22279
Disclaimer: ObjectMessage is an anti-pattern in JMS as it introduces tight coupling and and comes with security issues.
That said, a solution that works only with ActiveMQ and not JMS in generic:
<route id="routeme">
<from uri="activemq:inputqueue?mapJmsMessage=false" />
<setBody>
<simple>${body.getContent().getData()}</simple>
</setBody>
<marshal ref="compressor"/>
<to uri="activemq:outputqueue" />
</route>
The mapJmsMessage=false
makes Camel avoid reading the actual Object and keeping the body as ActiveMQObjectMessage
. Using that object, you can actually get hold of the byte[]
backing the ObjectMessage. The output of above route will be a BytesMessage
.
Upvotes: 1