Reputation: 1611
I have a Spring JmsListener in my code. It's receive and consume message for 2 days but suddenly after this 2 days it receive no message from external activemq. However there is some pending message in its queue. When I reset the activemq and consumer, the consumer receive large amount of message. When the messages are pending the consumer connected to the activemq(on behalf of spring actuator logs). The logs and config show that activemq did not push message to consumer. I have another services like this consumer that receive the message from other queue. Also the second consumer has same problem. I want to know why this problem happened and how can I solve this problem. Here is my config and log:
Consumer:
application.xml:
spring.jms.pub-sub-domain=false
spring.jms.template.delivery-mode=persistent
spring.activemq.broker-url=${BROKER_URL:failover:(tcp://activemq1:61616,tcp://activemq2:61616)?maxReconnectDelay=2500}
spring.activemq.user=${BROKER_USER:admin}
spring.activemq.password=${BROKER_PASSWORD:admin}
JmsConfiguration:
@EnableJms
@Configuration
public class JmsConfiguration {
@Bean
public JmsListenerContainerFactory<?> myFactory(ActiveMQConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
connectionFactory.setRedeliveryPolicy(redeliveryPolicy);
factory.setMessageConverter(messageConverter());
configurer.configure(factory, connectionFactory);
return factory;
}
private MessageConverter messageConverter() {
MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
messageConverter.setTargetType(MessageType.TEXT);
messageConverter.setTypeIdPropertyName("_type");
return messageConverter;
}
}
Listener:
@Component
public class MessageReceiver {
@JmsListener(destination = Constant.OFFICE_REQUEST_QUEUE, containerFactory = "myFactory")
public void receive(RequestMessage requestMessage, @Headers Map<String,Object> headers) throws NonPersistenceServiceException {
// do someting on received message
}
}
Activemq config:
activemq.xml:
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<bean id="oracleDS" class="oracle.jdbc.pool.OracleDataSource" destroy-method="close">
<property name="URL" value="jdbc:oracle:thin:@(DESCRIPTION= (SDU=32768)(ADDRESS=(PROTOCOL=TCP)(HOST= dms-db1-vip)(PORT=1521)) (ADDRESS=(PROTOCOL=TCP)(HOST= dms-db2-vip)(PORT=1521)) (LOAD_BALANCE=yes)(FAILOVER=ON)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=orcl)(failover_mode=(type=select)(method=basic)(retries=5)(delay=1))))"/>
<property name="user" value="ora_user"/>
<property name="password" value="ora_pass"/>
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" >
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<defaultEntry>
<redeliveryPolicy
useExponentialBackOff="true"
backOffMultiplier="2"
maximumRedeliveryDelay="1200000"
maximumRedeliveries="82"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#oracleDS" />
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
</beans>
Below is The log of actuator in consumer:
{"log":"20:20:26.775 @@@ [ActiveMQ Task-1] INFO o.a.activemq.transport.failover.FailoverTransport @@@ @@@ @@@\r\n","stream":"stdout"}
{"log":" Successfully connected to tcp://activemq1:61616\r\n","stream":"stdout"}
{"log":"20:20:36.638 @@@ [ActiveMQ Task-1] INFO o.a.activemq.transport.failover.FailoverTransport @@@ @@@ @@@\r\n","stream":"stdout"}
{"log":" Successfully connected to tcp://activemq1:61616\r\n","stream":"stdout"}
{"log":"20:20:36.779 @@@ [ActiveMQ Task-1] INFO o.a.activemq.transport.failover.FailoverTransport @@@ @@@ @@@\r\n","stream":"stdout"}
{"log":" Successfully connected to tcp://activemq1:61616\r\n","stream":"stdout"}
{"log":"20:20:46.464 @@@ [ActiveMQ Task-1] INFO o.a.activemq.transport.failover.FailoverTransport @@@ @@@ @@@\r\n","stream":"stdout"}
{"log":" Successfully connected to tcp://activemq1:61616\r\n","stream":"stdout"}
{"log":"20:20:46.774 @@@ [ActiveMQ Task-1] INFO o.a.activemq.transport.failover.FailoverTransport @@@ @@@ @@@\r\n","stream":"stdout"}
Activemq erros log:
2019-03-05 20:19:55,448 | WARN | Transport Connection to: tcp://10.42.1.0:63493 failed: java.io.IOException: Frame size of 1 GB larger than max allowed 100 MB | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///10.42.1.0:63493@61616
2019-03-05 20:20:05,776 | WARN | Transport Connection to: tcp://10.42.1.0:63498 failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///10.42.1.0:63498@61616
2019-03-05 20:20:05,776 | WARN | Transport Connection to: tcp://10.42.1.0:63497 failed: java.io.EOFException | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ Transport: tcp:///10.42.1.0:63497@61616
Upvotes: 4
Views: 5870
Reputation: 174554
Such scenarios are most often the result of one of two issues:
Some network component (router, firewall) silently closes a socket due to inactivity without notifying the client or server so, unless heartbeats are enabled, the connection is dead.
The consumer thread is somehow "stuck" in user code so it stops receiving new messages; take a stack dump to see what the container threads are doing.
Upvotes: 1