Reputation: 7
I am developing a demo chat application using Spring Integration, Websocket and RabbitMQ server. When I execute the application on single server then it works fine.
All the messages sent by Producer are received by consumer. But, when I run it on a clustered environment then messages are randomly received on servers, not received by all servers.
I don't know whether there is any problem with my code or it's the configuration that causing it.
I tried to check it via logger. Logger shows me that message is sent successfully, but not received by all servers instead received by only one server.
Below are the classes that I am using along with configuration.
ChatController.java
@Controller
public class ChatController {
private Logger logger = LoggerFactory.getLogger(getClass());
@RequestMapping(value = "/", method = RequestMethod.GET)
public String viewApplication() {
return "index";
}
@Autowired
private AmqpTemplate reviewTemplate;
@MessageMapping(value = "/random")
public void sendDataUpdates(OutputMessage message) {
try {
System.out.println("<<<<<<< Sending Message <<<<<<<<<<" + message.getMessage() + " ID : " + message.getId());
sendMessages(message);
} catch (Exception ex) {
System.out.println("Exception ------>>>>>> " + ex);
}
}
private void sendMessages(OutputMessage msg) {
reviewTemplate.convertAndSend(msg);
}
}
RandomDataGenerator.java
@Component
public class RandomDataGenerator implements
ApplicationListener<BrokerAvailabilityEvent> {
private final MessageSendingOperations<String> messagingTemplate;
@Autowired
public RandomDataGenerator(
final MessageSendingOperations<String> messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
@Override
public void onApplicationEvent(final BrokerAvailabilityEvent event) {
}
public void onMessage(GenericMessage<?> msg) {
try {
System.out.println("Message ====== >>>>> " + msg);
OutputMessage message = (OutputMessage) msg.getPayload();
this.messagingTemplate.convertAndSend(
"/data", message);
System.out.println("Message ====== >>>>> " + message.getMessage());
} catch (Exception ex) {
System.out.println("==================== " + ex);
}
finally {
}
}
}
webapp-config.xml
<rabbit:annotation-driven />
<rabbit:admin id="rabbitAdmin" connection-factory="rabbitConnectionFactory" auto-startup="true" />
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-timeout="5000" publisher-returns="true"
channel-cache-size="32" cache-mode="CHANNEL"
host="localhost" username="guest" password="guest" port="5672"
publisher-confirms="true" requested-heartbeat="5000" />
<rabbit:fanout-exchange name="reviewExchange" id="reviewExchange" durable="true">
<rabbit:bindings>
<rabbit:binding queue="reviewQueue"></rabbit:binding>
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:direct-exchange name="directExchange" id="directExchange" durable="true" />
<rabbit:template id="reviewTemplate" connection-factory="rabbitConnectionFactory"
encoding="UTF-8" exchange="reviewExchange" queue="reviewQueue"
routing-key="reviewKey" />
<rabbit:queue id="reviewQueue" name="reviewQueue" durable="true" />
<bean id="customMessageListener" class="de.kimrudolph.tutorials.utils.RandomDataGenerator" />
<int:publish-subscribe-channel id="reviewPubSubChannel" />
<amqp:outbound-channel-adapter channel="reviewPubSubChannel"
amqp-template="reviewTemplate" exchange-name="reviewExchange"/>
<int:channel id="reviewInboundChannel" />
<amqp:inbound-channel-adapter channel="reviewInboundChannel" queue-names="reviewQueue" connection-factory="rabbitConnectionFactory" />
<int:service-activator input-channel="reviewInboundChannel" id="reviewQueueServiceActivator" ref="customMessageListener" method="onMessage" />
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/random">
<websocket:sockjs />
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/data" />
<websocket:client-inbound-channel>
<websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
</websocket:client-inbound-channel>
<websocket:client-outbound-channel>
<websocket:executor core-pool-size="200" keep-alive-seconds="300" max-pool-size="1000" queue-capacity="5000" />
</websocket:client-outbound-channel>
</websocket:message-broker>
Proxy Server Configuration
worker.list=loadbalancer,status
worker.tomcat1.port=8003
worker.tomcat1.host=localhost
worker.tomcat1.type=ajp13
worker.tomcat2.port=8008
worker.tomcat2.host=localhost
worker.tomcat2.type=ajp13
worker.tomcat3.port=8013
worker.tomcat3.host=localhost
worker.tomcat3.type=ajp13
worker.tomcat1.lbfactor=1
worker.tomcat2.lbfactor=1
worker.tomcat3.lbfactor=1
worker.loadbalancer.type=lb
worker.loadbalancer.balance_workers=tomcat1,tomcat2,tomcat3
worker.loadbalancer.sticky_session=1
worker.status.type=status
JkWorkersFile conf/workers.properties
JkLogFile logs/mod_jk.log
JkLogLevel error
JkMount /spring-mvc-websockets-master loadbalancer
JkMount /spring-mvc-websockets-master/* loadbalancer
JkMount /SpringChatExample loadbalancer
JkMount /SpringChatExample/* loadbalancer
Below is the link for sample application that you can test and try what is the cause of the problem:
Upvotes: 1
Views: 844
Reputation: 121272
That's correct, because only the single consumer can receive a message from the queue
Since all your receiver applications configure the same queue
, your Broker has only one binding
on that fanout-exchange
.
To achieve that you can go ahead with AnonymousQueue
, when you just provide an id
for the <rabbit:queue>
definition. In this case your fanout-exchange
will have as much bindings as much cluster members you have.
The AnonymousQueue
has auto-delete
advantage. That's mean when your cluster member stops the queue and its bindings will be removed.
In this case you should use SpEL to configure queue-names
:
Or generate a random queue name
and use auto-delete="true"
:
<bean id="inetAddress" class="java.net.InetAddress" factory-method="getLocalHost"/>
<rabbit:queue id="settingsReplyQueue" name="#inetAddress.toString()}"
auto-delete="true"/>
The same SpEL hook here too for the queue-names
.
Upvotes: 1