Reputation: 271
I am listening to a queue of activemq.
My configuration is like below:
ACTIVEMQ_BROKER_URL =failover:(tcp://LON6QAEQTAPP01:61616?wireFormat.maxInactivityDuration=0)
From the console logs I can see that a reconnect attempt is made to connect to this server.
[2014-08-20 08:57:43,303] INFO 236[ActiveMQ Task-1] - org.apache.activemq.transport.failover.FailoverTransport.doReconnect(FailoverTransport.java:1030) - Successfully connected to tcp://LON6QAEQTAPP01:61616?wireFormat.maxInactivityDuration=0 [2014-08-20 08:57:43,355] INFO 288[ActiveMQ Task-1] - org.apache.activemq.transport.failover.FailoverTransport.doReconnect(FailoverTransport.java:1030) - Successfully connected to tcp://LON6QAEQTAPP01:61616?wireFormat.maxInactivityDuration=0 [2014-08-20 08:57:43,374] INFO 307[ActiveMQ Task-1] - org.apache.activemq.transport.failover.FailoverTransport.doReconnect(FailoverTransport.java:1030) - Successfully connected to tcp://LON6QAEQTAPP01:61616?wireFormat.maxInactivityDuration=0
Still I am not able to consume the message. I am using the following code to try and handle it from code side but still not able to get the connection persistent.
try
{
connection.start();
while (true)
{
try
{
if (connection.isClosed() || connection.isTransportFailed() || session.isClosed() || !connection.getTransport().isConnected())
{
log.info("Connection was reset, re-connecting..");
connection.cleanup();
init();
connection.start();
}
}
catch (Throwable t)
{
init();
log.error("Connection was reset, re-connecting in exception block", t);
}
Thread.sleep(30000L);
}
private void init() throws Exception
{
init(brokerURL, queueName, userName, password, manual);
}
public void init(String brokerURL, String queueName, String userName, String password, boolean manual) throws Exception
{
this.manual = manual;
this.brokerURL = brokerURL;
this.queueName = queueName;
this.userName = userName;
this.password = password;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, password, brokerURL);
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.addTransportListener(new TransportListener()
{
@Override
public void transportResumed()
{
log.info("Transport Resumed ");
}
@Override
public void transportInterupted()
{
log.info("Transport Interupted ");
}
@Override
public void onException(IOException arg0)
{
arg0.printStackTrace();
log.error("Transport Exception: " + arg0.getMessage());
}
@Override
public void onCommand(Object arg0)
{
// TODO Auto-generated method stub
}
});
connection.setExceptionListener(this);
connection.setCloseTimeout(10);
session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);// CLIENT_ACKNOWLEDGE
Destination destination = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
}
Can someone help me as to how can I get this connection to be active alive all the time. It generally times out after inactivity of 40 mins or so.
Upvotes: 1
Views: 3252
Reputation: 18356
Trying to force reconnect an open connection the way you are is definitely not going to do you and good. You need to tear down the connection and create a new one if you want to handle connection interruption yourself. What you really should do is figure out what is closing your connection on you, probably a firewall closing down the inactive connection or a load balancer. Look at your environment and see what else is in the mix that could be closing the connection on you.
The broker can, if configured to do so, rebalance clients in a cluster. Are you using multiple clients in a clustered environment etc?
Upvotes: 1