Reputation: 21
I'm using Mule 3.5.0 CE edition. I have 2 requirements :
To manage this with Mule I was thinking of using ActiveMQ BlobMessage for managing the payload and a reliable acquisition pattern. First of all I was wondering if this is the best approach ?
Here what I have created :
However, in case of AMQ crash I'm losing messages...
I have some warnings in mule "Failure trying to remove file '...' from list of files under processing and the following error :
ERROR 2015-05-23 12:55:38,291 [[opx].File.receiver.01] org.mule.exception.DefaultMessagingExceptionStrategy: ******************************************************************************** Message : Cannot process event as "Active_MQ" is stopped Type : org.mule.api.lifecycle.LifecycleException Code : MULE_ERROR-70167 JavaDoc : http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html ******************************************************************************** Exception stack is: 1. Cannot process event as "Active_MQ" is stopped (org.mule.api.lifecycle.LifecycleException) org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor:38 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/lifecycle/LifecycleException.html) ******************************************************************************** Root Exception stack trace: org.mule.api.lifecycle.LifecycleException: Cannot process event as "Active_MQ" is stopped at org.mule.lifecycle.processor.ProcessIfStartedMessageProcessor.handleUnaccepted(ProcessIfStartedMessageProcessor.java:38) at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.endpoint.DefaultOutboundEndpoint.process(DefaultOutboundEndpoint.java:100) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.DynamicPipelineMessageProcessor.process(DynamicPipelineMessageProcessor.java:54) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.interceptor.AbstractEnvelopeInterceptor.process(AbstractEnvelopeInterceptor.java:51) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.processor.AbstractFilteringMessageProcessor.process(AbstractFilteringMessageProcessor.java:40) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.AbstractPipeline$1.process(AbstractPipeline.java:109) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.construct.AbstractPipeline$3.process(AbstractPipeline.java:207) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule.execution.MessageProcessorNotificationExecutionInterceptor.execute(MessageProcessorNotificationExecutionInterceptor.java:58) at org.mule.execution.MessageProcessorExecutionTemplate.execute(MessageProcessorExecutionTemplate.java:44) at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:24) at org.mule... ********************************************************************************
EDIT : Here is the configuration.
Flow :
<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616?jms.redeliveryPolicy.initialRedeliveryDelay=3000&jms.blobTransferPolicy.defaultUploadUrl=http://localhost:8161/fileserver/" validateConnections="true" maxRedelivery="-1" cacheJmsSessions="false" persistentDelivery="true" doc:name="Active MQ" >
<reconnect frequency="60000" count="20"/>
</jms:activemq-connector>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true" validateConnections="true"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
<file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
<component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
<jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS"/>
</flow>
My java component :
public class InputStreamToBlobMessage implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
MuleMessage muleMsg = eventContext.getMessage();
InputStream is = (InputStream) muleMsg.getPayload();
JmsConnector amqConnector = (JmsConnector) eventContext.getMuleContext().getRegistry().lookupConnector("Active_MQ");
BlobMessage bm = null;
if (amqConnector.isConnected())
{
ActiveMQSession session = (ActiveMQSession) amqConnector.getSession(false, false);
bm = session.createBlobMessage(is);
}
return bm;
}
}
Upvotes: 2
Views: 1163
Reputation: 21
Here the config I used to make it works :
<jms:activemq-connector name="Active_MQ" specification="1.1" brokerURL="tcp://localhost:61616" connectionFactory-ref="connectionFactory" validateConnections="true" maxRedelivery="-1" persistentDelivery="true" doc:name="Active MQ">
</jms:activemq-connector>
<spring:beans>
<spring:bean name="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" doc:name="Bean">
<spring:property name="redeliveryPolicy">
<spring:bean name="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="redeliveryDelay" value="60000"/>
<spring:property name="maximumRedeliveries" value="20"/>
<spring:property name="initialRedeliveryDelay" value="10000"/>
</spring:bean>
</spring:property>
<spring:property name="blobTransferPolicy">
<spring:bean name="blobTransferPolicy" class="org.apache.activemq.blob.BlobTransferPolicy">
<spring:property name="defaultUploadUrl" value="http://localhost:8161/fileserver/"/>
</spring:bean>
</spring:property>
</spring:bean>
</spring:beans>
<file:connector name="File" workDirectory="/home/fs/workDirectory" autoDelete="true" streaming="true" validateConnections="true" doc:name="File"/>
<flow name="ReceiveFromFS" processingStrategy="synchronous">
<file:inbound-endpoint path="/home/fs/in" pollingFrequency="5000" fileAge="1000" connector-ref="File"/>
<component class="mypackage.InputStreamToBlobMessage" doc:name="Java"/>
<jms:outbound-endpoint queue="queue1" connector-ref="Active_MQ" doc:name="JMS">
<jms:transaction action="ALWAYS_BEGIN"/>
</jms:outbound-endpoint>
</flow>
However, when I was using a connectionFactory in MuleStudio, it's was not working properly and did manage the retries. Bug ?? Using the same config in Mule embedded in Tomcat worked fine.
One more thing, in Mule 3.6 the JMS sessions are cached by default so the Session can be accessed the way I did in the component or cacheJmsSessions="false" has to be used.
Voilà :)
Upvotes: 0
Reputation: 4704
In this case it looks like your reconnection attempts have been exhausted and that is why your connector stays at the 'stopped' state.
Please try and replace your reconnection strategy with
<reconnect-forever frequency="60000" />
Upvotes: 0