Reputation: 65
Our application built and runs on :-
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.2.0.RELEASE
rabbitmq.version = 3.1.3 (client)
RabbitMQ Server version is 3.1.5
We wanted to upgrade the rabbitmq server from 3.1.5 to 3.3.5 and we did that successfully.
Now we wanted to upgrade the application to use latest version of spring-amqp, RabbitMQ java client, so we have upgrade the following components :-
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.3.0.RELEASE
rabbitmq.version = 3.2.4 (client)
RabbitMQ Server version is 3.3.5
However after upgrading to spring-amqp to 1.3.0, our application started to hung. Basically we start many listener containers during the application startup, and starting each listener container now takes exactly 60 secs to allow to the next step
after digging deep I found that, the program gets hanging in run() method in SimpleMessageListenerContainer class :-
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
public void run() {
boolean aborted = false;
int consecutiveIdles = 0;
int consecutiveMessages = 0;
try {
try {
SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); // Here is where the programing thread hangs.
this.consumer.start();
this.start.countDown();
}
As mentioned above code, the thread gets hung at redeclareElementsIfNecessary() method, and this method is introduced in this version of spring-rabbit only. I have no clue why its getting hanging there, whatever suggested parameter's that I pass to this SimpleMessageListenerContainer it doesn't seems to be working.
If I revert back to spring-amqp 1.2.0 release with the new RabbitMQ server 3.3.5 all seems to be working fine, but things are not working with new spring-amqp client.
I am kind of stuck here for couple of days now. Spring / Rabbitmq master's out there, can you please help me solving this problem?
Thanks for your quick response, however it seems the code is not reaching to that point and it just get hung just above the snippet that you have provide, I have comment below on where the code gets hung exactly below
Set<String> queueNames = this.getQueueNamesAsSet();
Map<String, Queue> queueBeans = ((ListableBeanFactory) applicationContext).getBeansOfType(Queue.class); // The code started to hung here
for (Entry<String, Queue> entry : queueBeans.entrySet()) {
Queue queue = entry.getValue();
if (queueNames.contains(queue.getName()) && queue.isAutoDelete()
&& this.rabbitAdmin.getQueueProperties(queue.getName()) == null) {
if (logger.isDebugEnabled()) {
logger.debug("At least one auto-delete queue is missing: " + queue.getName()
+ "; redeclaring context exchanges, queues, bindings.");
}
this.rabbitAdmin.initialize();
break;
}
}
Actually we have upgraded to the latest version of spring-amqp only, that is
spring.version = 3.2.8.RELEASE
spring.amqp.version = 1.3.6.RELEASE
rabbitmq.version = 3.3.4 (client)
RabbitMQ Server version is 3.3.5
however we faced exactly the same issue, so just to find out from which version the issue started I ran down to the lower versions upto 1.3.0, seems the issue is starting in 1.3.0 version of spring-amqp itself. that's the reason.
I have attached the requested information including thread dumps which are based on spring-amqp 1.3.6 only.
Here is the configuration our listener container where the programs hangs, as you can see we have our own SimpleMessageLinstenerContainer which acts as a wrapper for the acutal spring's SimpleMessageListenerContainer, I have also attached this custom wrapper file for your reference.
<bean id="tlogOutOfCycleMessageListenerPrototype" class="com.myorg.ips.cnccommon.support.amqp.SimpleMessageListenerContainer" scope="prototype">
<property name="channelTransacted" value="true" />
<property name="transactionManager" ref="transactionManager" />
<property name="concurrentConsumers" value="1" />
<property name="taskExecutor" ref="tlogOutOfCycleMessageListenerPool" />
<property name="messageListener" ref="tlogMLAOutOfCycle" />
<property name="errorHandler" ref="tlogOutOfCycleMessageHandler" />
<property name="autoStartup" value="false" />
<property name="instanceNameForLogging" value="site1TlogOutOfCycleMessageListener"/>
<!-- A dummy connection factory which will never be used -->
<property name="connectionFactory" ref="switchCompositeConnectionFactoryPrototype"/>
</bean>
Our wrapper class SimpleMessageListenerContainer.java
package com.myorg.ips.cnccommon.support.amqp;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.slf4j.cal10n.LocLogger;
import org.springframework.util.ErrorHandler;
import com.myorg.ips.amqp.SwitchSiteSupport;
import com.myorg.ips.logging.LoggerFactory;
import com.myorg.ips.system.config.InitialisableSiteAware;
import static com.myorg.ips.logging.SystemWideLogMessages.ERROR_AMQP_FAILED_TO_START_LISTENER;
import static com.myorg.ips.logging.SystemWideLogMessages.INFO_AMQP_STOPPING_LISTENER;
/**
*
* Wrapper for the Spring SimpleMessageListenerContainer which simply allows us to delay (or prevent startup). Can also restart on command.
*
*/
public class SimpleMessageListenerContainer extends org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer implements InitialisableSiteAware {
private static final LocLogger logger = LoggerFactory.getLogger(SimpleMessageListenerContainer.class);
private boolean autoStart = true;
private ErrorHandler exposedErrorHandler;
private boolean springBeanInitialisationAttempted = false;
private boolean springBeanInitialised = false;
private String instanceNameForLogging;
@Override
public void initialize() {
// Do nothing -- we will instead perform the Spring bean initialisation later on via the factory bean, after the connection factory has been set
springBeanInitialisationAttempted = true;
springBeanInitialised = false;
}
@Override
public void initialise() {
SwitchSiteSupport.initialiseIfSiteAware(getMessageListener());
SwitchSiteSupport.initialiseIfSiteAware(getErrorHandler());
// If this object is a Spring bean, we should now complete the initialisation that the Spring framework attempted earlier
if (springBeanInitialisationAttempted && !springBeanInitialised) {
springBeanInitialised = true;
super.initialize();
if (isAutoStartup()) {
start();
}
}
}
@Override
public void configureForSite(final MultiHostConnectionFactory configuredConnectionFactory) {
setConnectionFactory(configuredConnectionFactory);
SwitchSiteSupport.configureIfSiteAware(getMessageListener(), configuredConnectionFactory);
SwitchSiteSupport.configureIfSiteAware(getErrorHandler(), configuredConnectionFactory);
setInstanceNameForLogging(SwitchSiteSupport.replaceWithSiteAlias(instanceNameForLogging, configuredConnectionFactory));
}
@Override
//CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so so do we
protected void doStart() throws Exception {
//CHECKSTYLE:ON
if (autoStart) {
logger.debug("Starting message listener " + instanceNameForLogging);
super.doStart();
logger.debug("Started message listener " + instanceNameForLogging);
}
}
/**
* Start this listener
*/
public void start() {
autoStart = true;
try {
doStart();
//CHECKSTYLE:OFF Unfortunately the parent springframework class throws and exception, so that is what we catch
} catch (Exception e) {
//CHECKSTYLE:ON
logger.error(ERROR_AMQP_FAILED_TO_START_LISTENER, e);
}
}
/**
* Stop listener
*/
public void stop() {
logger.info(INFO_AMQP_STOPPING_LISTENER, getBeanName());
autoStart = false;
doStop();
}
/**
* Stop and start this listener
*/
public void restart() {
stop();
start();
}
/**
* Store the errorHandler in a subclass-specific property so that we can retrieve it later
* @param errorHandler errorHandler
*/
@Override
public void setErrorHandler(final ErrorHandler errorHandler) {
this.exposedErrorHandler = errorHandler;
super.setErrorHandler(errorHandler);
}
/**
* Return the exposed errorHandler
* @return errorHandler
*/
public ErrorHandler getErrorHandler() {
return exposedErrorHandler;
}
public void setInstanceNameForLogging(final String instanceNameForLogging) {
this.instanceNameForLogging = instanceNameForLogging;
}
@Override
public String toString(){
return ToStringBuilder.reflectionToString(this);
}
}
Upvotes: 1
Views: 495
Reputation: 121292
Good catch anyway!
I'm just right now working with that code for Spring AMQP 1.4.
Would you mind sharing:
ListenerContainer
on which you hangredeclareElementsIfNecessary()
Actually now that code looks like:
if (queueNames.contains(queue.getName()) && queue.isAutoDelete()
&& this.rabbitAdmin.getQueueProperties(queue.getName()) == null) {
if (logger.isDebugEnabled()) {
logger.debug("At least one auto-delete queue is missing: " + queue.getName()
+ "; redeclaring context exchanges, queues, bindings.");
}
this.rabbitAdmin.initialize();
break;
}
So, it may heppen only on the auto-delete
Queue.
Or do you have another picture?..
UPDATE
According to your ThreadDump. This is illegal:
at com.vocalink.ips.amqp.AmqpMessageListenerManager.initialise(AmqpMessageListenerManager.java:106)
at com.vocalink.ips.amqp.SwitchSiteSupport.initialiseIfSiteAware(SwitchSiteSupport.java:29)
at com.vocalink.ips.system.config.AbstractSiteAwareComponentCachingFactory.createAndConfigureSiteAwareComponent(AbstractSiteAwareComponentCachingFactory.java:51)
You can't start
component within initialization phase. Or leave it to the container, or just do start
manually somewhere at runtime, when all of your beans are already created.
For example you can do that using ApplicationListener<ContextRefreshedEvent>
.
Upvotes: 0