Reputation: 71
We are using Stomp, SpringBoot, and WebSockets in our application. The server application is doing the following: 1) Generating messages to be pushed to the users, 2) Accepting WebSocket connections and 3) pushing messages to ActiveMQ stomp broker. Thread dump shows a lot of waiting threads associated with the simpMessagingTemplate convertAndSendToUser API call.
Two instances of the application are running in a cloud. This application generates messages and pushes to ActiveMQ stomp broker (running separately) using simpMessagingTemplate convertAndSendToUser API.
We use Gatling for simulating user WebSocket connections for load testing. The Gatling runs on a separate instance. The application works fine for 2000 user connections. Once we increase the users to 4000 we see that the message generation thread stops. Users are getting connected to the same servers without any problem though.
If we comment the simpMessagingTemplate convertAndSendToUser API call then everything works perfectly fine (both generate messages and new WebSocket connections). So we doubt the issue with convertAndSendToUser API.
Threaddump stack trace is given below:
"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
at com.mypackage.PushMessageManager.lambda$sendMyMessage$2(PushMessageManager.java:77)
at com.mypackage.PushMessageManager$$Lambda$923/1850582969.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
at com.mypackage.PushMessageManager.lambda$processPushMessage$0(PushMessageManager.java:61)
at com.mypackage.PushMessageManager$$Lambda$664/624459498.run(Unknown Source)
at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at nl.talsmasoftware.context.executors.ContextAwareExecutorService$1.call(ContextAwareExecutorService.java:59)
at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
Steps are mentioned below w.r.t the diagram:
Client connect and subscribe looks like this
stompClient.connect({'username': $("#userName").val()}, function (frame) { setConnected(true); subscription = stompClient.subscribe('/user/queue/abc', function (message) { showData(JSON.parse(message.body)); },headers = {'loginusername': $("#userName").val()}); });
So each user shall receive only the messages intended to them not all the messages. That is the reason we are connecting the users to individual queues while connecting through WebSocket and also use convertAndSendToUser to push the messages to specific sessions. The backend JMS publisher ensures that the messages are published to users in a cyclic fashion.
To answer your question regarding identifying the bottleneck, if we connect say 2000 users everything works fine. But when we add more users, we see that the JMS listener of the application is not able to listen to the 20000 messages per min sent by the backend Gatling JMS load generator. The ActiveMQ JMS queue depth increases due to this.
To make sure that the bottleneck is the convertAndSendToUser API what we did is commented that API call. If we do that we are able to connect ~13k WebSocket connections and the backend JMS listener is able to consume all the 20000 messages per min messages also.
Hope this clarifies some of your questions. UPDATE Code snippet to show the asynchronous invocation of simpMessagingTemplate.convertAndSendToUser API is given below. Here RepositoryUtil.executor() is our own wrapper for the executor object.
public CompletableFuture<Void> processPushMessage(String userName, String payload) {
return ContextAwareCompletableFuture.runAsync(() -> {
sendABCMessage(payload, userName);
}, RepositoryUtil.executor());
}
public void sendABCMessage(@Payload String payload, String username) {
ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
userProfiles.parallelStream()
.filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
.forEach(userProfiles1 -> { simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
});
} else {
LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
}
}
Upvotes: 4
Views: 2032
Reputation: 71
We are able to resolve the issue by moving to /user/topic instead of /user/queue. We are now able to process ~35k per minute messages from backend and 8k web socket user connections.
Upvotes: 3
Reputation: 5018
The application works fine for 2000 user connections with a load of 20,000 messages per minute. Once we increase the users to 4000 we see that the message generation thread stops.
If you push 20,000 messages to ActiveMQ and each message has 1,000 subscribers, that means 20,000,000 messages (1,000 * 20,000) are published back to WebSocket clients. So try to determine the overall volume of messages flowing through and understand where the bottleneck is (the server forwarding messages to the ActiveMQ, ActiveMQ processing messages, or the server publishing messages to WebSocket clients).
For the 20,000 messages, are they generated from a single thread, or are they sent from a large number of different threads, e.g. as a result of processing messages from WebSocket clients, or REST HTTP calls? If it is the latter it could be that too many threads are trying to forward messages to the broker concurrently and you may have to apply some kind of rate limits.
At the end of the day you need to understand the overall volume, where the bottleneck is, and where to apply some rate limits.
Upvotes: 0