Aashish
Aashish

Reputation: 7

All subscribers not received message in spring integration rabbitmq clustered application

I am developing a demo chat application using Spring Integration, Websocket and RabbitMQ server. When I execute the application on single server then it works fine.

All the messages sent by Producer are received by consumer. But, when I run it on a clustered environment then messages are randomly received on servers, not received by all servers.

I don't know whether there is any problem with my code or it's the configuration that causing it.

I tried to check it via logger. Logger shows me that message is sent successfully, but not received by all servers instead received by only one server.

Below are the classes that I am using along with configuration.

ChatController.java

@Controller
public class ChatController {

private Logger logger = LoggerFactory.getLogger(getClass());

@RequestMapping(value = "/", method = RequestMethod.GET)
public String viewApplication() {
    return "index";
}
@Autowired
private AmqpTemplate reviewTemplate;

@MessageMapping(value = "/random")
public void sendDataUpdates(OutputMessage message) {
    try {
        System.out.println("<<<<<<< Sending Message <<<<<<<<<<" + message.getMessage() + "   ID : " + message.getId());
        sendMessages(message);
    } catch (Exception ex) {
        System.out.println("Exception ------>>>>>> " + ex);
    }
}

private void sendMessages(OutputMessage msg) {
    reviewTemplate.convertAndSend(msg);
}

}

RandomDataGenerator.java

@Component
public class RandomDataGenerator implements
    ApplicationListener<BrokerAvailabilityEvent> {

private final MessageSendingOperations<String> messagingTemplate;

@Autowired
public RandomDataGenerator(
        final MessageSendingOperations<String> messagingTemplate) {
    this.messagingTemplate = messagingTemplate;
}

@Override
public void onApplicationEvent(final BrokerAvailabilityEvent event) {
}

public void onMessage(GenericMessage<?> msg) {
    try {
        System.out.println("Message ====== >>>>> " + msg);
        OutputMessage message = (OutputMessage) msg.getPayload();
        this.messagingTemplate.convertAndSend(
                "/data", message);    

        System.out.println("Message ====== >>>>> " + message.getMessage());           
    } catch (Exception ex) {
        System.out.println("==================== " + ex);
    }
    finally {
    }
}    

}

webapp-config.xml

<rabbit:annotation-driven />

<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory" auto-startup="true" />

<rabbit:connection-factory id="rabbitConnectionFactory" 
                           connection-timeout="5000" publisher-returns="true"
                           channel-cache-size="32" cache-mode="CHANNEL"
                           host="localhost" username="guest" password="guest" port="5672" 
                           publisher-confirms="true" requested-heartbeat="5000" />

<rabbit:fanout-exchange name="reviewExchange" id="reviewExchange" durable="true">
    <rabbit:bindings>
        <rabbit:binding queue="reviewQueue"></rabbit:binding>
    </rabbit:bindings>        
</rabbit:fanout-exchange>

<rabbit:direct-exchange name="directExchange" id="directExchange" durable="true" />


<rabbit:template id="reviewTemplate" connection-factory="rabbitConnectionFactory"
                 encoding="UTF-8" exchange="reviewExchange" queue="reviewQueue"       
                 routing-key="reviewKey" />

<rabbit:queue id="reviewQueue" name="reviewQueue" durable="true" />    

<bean id="customMessageListener" class="de.kimrudolph.tutorials.utils.RandomDataGenerator" />

<int:publish-subscribe-channel id="reviewPubSubChannel" />

<amqp:outbound-channel-adapter channel="reviewPubSubChannel"
                               amqp-template="reviewTemplate" exchange-name="reviewExchange"/>    

<int:channel id="reviewInboundChannel" /> 

<amqp:inbound-channel-adapter channel="reviewInboundChannel" queue-names="reviewQueue" connection-factory="rabbitConnectionFactory" />

<int:service-activator input-channel="reviewInboundChannel" id="reviewQueueServiceActivator" ref="customMessageListener" method="onMessage" />


<websocket:message-broker application-destination-prefix="/app">
    <websocket:stomp-endpoint path="/random">
        <websocket:sockjs />
    </websocket:stomp-endpoint>
    <websocket:simple-broker prefix="/data" />
    <websocket:client-inbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-inbound-channel>
    <websocket:client-outbound-channel>
        <websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
    </websocket:client-outbound-channel>
</websocket:message-broker>

Proxy Server Configuration

worker.list=loadbalancer,status  
 worker.tomcat1.port=8003  
 worker.tomcat1.host=localhost  
 worker.tomcat1.type=ajp13  

 worker.tomcat2.port=8008  
 worker.tomcat2.host=localhost  
 worker.tomcat2.type=ajp13  

 worker.tomcat3.port=8013  
 worker.tomcat3.host=localhost  
 worker.tomcat3.type=ajp13  

 worker.tomcat1.lbfactor=1  
 worker.tomcat2.lbfactor=1  
 worker.tomcat3.lbfactor=1 

 worker.loadbalancer.type=lb  
 worker.loadbalancer.balance_workers=tomcat1,tomcat2,tomcat3
 worker.loadbalancer.sticky_session=1

 worker.status.type=status 


JkWorkersFile conf/workers.properties
JkLogFile logs/mod_jk.log 
JkLogLevel error 
JkMount /spring-mvc-websockets-master loadbalancer 
JkMount /spring-mvc-websockets-master/* loadbalancer
JkMount /SpringChatExample loadbalancer 
JkMount /SpringChatExample/* loadbalancer

Below is the link for sample application that you can test and try what is the cause of the problem:

Demo Application

Upvotes: 1

Views: 844

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

That's correct, because only the single consumer can receive a message from the queue

Since all your receiver applications configure the same queue, your Broker has only one binding on that fanout-exchange.

To achieve that you can go ahead with AnonymousQueue, when you just provide an id for the <rabbit:queue> definition. In this case your fanout-exchange will have as much bindings as much cluster members you have.

The AnonymousQueue has auto-delete advantage. That's mean when your cluster member stops the queue and its bindings will be removed. In this case you should use SpEL to configure queue-names:

Or generate a random queue name and use auto-delete="true":

<bean id="inetAddress" class="java.net.InetAddress" factory-method="getLocalHost"/>

<rabbit:queue id="settingsReplyQueue" name="#inetAddress.toString()}"
       auto-delete="true"/>

The same SpEL hook here too for the queue-names.

Upvotes: 1

Related Questions