Reputation: 576
I have message producers that are sending JMS messages about some events using ActiveMQ. However, connection to ActiveMQ might not be up all the time. Thus, events are stored and when connection is established they are suppose to be read and sent over. Here is my code:
private void sendAndSave(MyEvent event) {
boolean sent = sendMessage(event);
event.setProcessed(sent);
boolean saved = repository.saveEvent(event);
if (!sent && !saved) {
logger.error("Change event lost for Id = {}", event.getId());
}
}
private boolean sendMessage(MyEvent event) {
try {
messenger.publishEvent(event);
return true;
} catch (JmsException ex) {
return false;
}
}
I'd like to create some kind of ApplicationEventListener that will be invoked when connection is established and process unsent events. I went through JMS, Spring framework and ActiveMQ documentation but couldn't find any clues how to hook up my listener with ConnectionFactory.
If someone can help me out, I'll appreciate it greatly.
Here is what my app Spring context says about JMS:
<!-- Connection factory to the ActiveMQ broker instance. -->
<!-- The URI and credentials must match the values in activemq.xml -->
<!-- These credentials are shared by ALL producers. -->
<bean id="jmsTransportListener" class="com.rhd.ams.service.common.JmsTransportListener"
init-method="init" destroy-method="cleanup"/>
<bean id="amqJmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.publisher.broker.url}"/>
<property name="userName" value="${jms.publisher.username}"/>
<property name="password" value="${jms.publisher.password}"/>
<property name="transportListener" ref="jmsTransportListener"/>
</bean>
<!-- JmsTemplate, by default, will create a new connection, session, producer for -->
<!-- each message sent, then close them all down again. This is very inefficient! -->
<!-- PooledConnectionFactory will pool the JMS resources. It can't be used with consumers.-->
<bean id="pooledAmqJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="amqJmsConnectionFactory" />
</bean>
<!-- Although JmsTemplate instance is unique for each message, it is -->
<!-- thread-safe and therefore can be injected into referenced obj's. -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="pooledAmqJmsConnectionFactory"/>
</bean>
Upvotes: 3
Views: 3728
Reputation: 16056
The way you describe the issue, it sure sounds like an open-and-shut case of JMS Durable Subscriptions. You might want to consider a more traditional implementation before going down this road. Caveats aside, ActiveMQ provides Advisory Messages which you can listen for and which will be sent for various events including new connections.
=========
Shoot, sorry... I did not understand what the issue was. I don't think Advisories are the solution at all.... after all, you need to be connected to the broker to get them, but being connected is what you know about.
So if I understand it correctly (prepare for retry #2....), what you need is a client connection which, when it fails, attempts to reconnect indefinitely. When it does reconnect, you want to trigger an event (or more) that flushes pending messages to the broker.
So detecting the lost connection is easy. You just register a JMS ExceptionListener. As far as detecting a reconnect, the simplest way I can think of is to start a reconnect thread. When it connects, stop the reconnect thread and notify interested parties using Observer/Observable or JMX notifications or the like. You could use the ActiveMQ Failover Transport which will do a connection retry loop for you, even if you only have one broker. At least, it is supposed to, but it's not doing that much for you that would not be done by your own reconnect thread... but if you're willing to delegate some control to it, it will cache your unflushed messages (see the trackMessages option), and then send them when it reconnects, which is sort of all of what you're trying to do.
I guess if your broker is down for a few minutes, that's not a bad way to go, but if you're talking hours, or you might accumulate 10k+ messages in the downtime, I just don't know if that cache mechanism is as reliable as you would need it to be.
==================
Mobile app ... right. Not really appropriate for the failover transport. Then I would implement a timer that periodically connects (might be a good idea to use the http transport, but not relevant). When it does connect, if there's nothing to flush, then see you in x minutes. If there is, send each message, wait for a handshake and purge the message from you mobile store. Then see you again in x minutes.
I assume this is Android ? If not, stop reading here. We actually implemented this some time ago. I only did the server side, but if I remember correctly, the connection timer/poller spun every n minutes (variable frequencies, I think, because getting too aggressive was draining the battery). Once a successful connection was made, I believe they used an intent broadcast to nudge the message pushers to do their thing. The thinking was that even though there was only one message pusher, we might add more.
Upvotes: 1