jordan.baucke
jordan.baucke

Reputation: 4328

Spring WebSockets ActiveMQ convertAndSendToUser

I have a Spring Boot app (Jhipster) that uses STOMP over WebSockets to communicate information from the server to users.

I recently added an ActiveMQ server to handle scaling the app horizontally, with an Amazon auto-scaling group / load-balancer.

I make use the convertAndSendToUser() method, which works on single instances of the app to locate the authenticated users' "individual queue" so only they receive the message.

However, when I launch the app behind the load balancer, I am finding that messages are only being sent to the user if the event is generated on the server that their websocket-proxy connection (to the broker) is established on?

How do I ensure the message goes through ActiveMQ to whichever instance of the app that the user is actually "connected too" regardless of which instance receives, say an HTTP Request that executes the convertAndSendToUser() event?

For reference here is my StompBrokerRelayMessageHandler:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
    StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
    handler.setTcpClient(new Reactor2TcpClient<>(
        new StompTcpFactory(orgProperties.getAws().getAmazonMq().getStompRelayHost(),
            orgProperties.getAws().getAmazonMq().getStompRelayPort(), orgProperties.getAws().getAmazonMq
            ().getSsl())
    ));

    return handler;
}

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/queue", "/topic")
        .setSystemLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setSystemPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass())
        .setClientLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setClientPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass());

    config.setApplicationDestinationPrefixes("/app");
}

I have found the name corresponding to the queue that is generated on ActiveMQ by examining the headers in the SessionSubscribeEvent, that is generated in the listener when a user subscribes to a user-queue, as simpSessionId.

@Override
@EventListener({SessionSubscribeEvent.class})
public void onSessionSubscribeEvent(SessionSubscribeEvent event) {
    log.debug("Session Subscribe Event:" +
        "{}", event.getMessage().getHeaders().toString());
}

Corresponding queues' can be found in ActiveMQ, in the format: {simpDestination}-user{simpSessionId}

Could I save the sessionId in a key-value pair and just push messages onto that topic channel?


I also found some possibilities of setting ActiveMQ specific STOMP properties in the CONNECT/SUBSCRIBE frame to create durable subscribers if I set these properties will Spring than understand the routing?

client-id & subcriptionName

Upvotes: 3

Views: 2015

Answers (1)

jordan.baucke
jordan.baucke

Reputation: 4328

Modifying the MessageBrokerReigstry config resolved the issue:

config.enableStompBrokerRelay("/queue", "/topic")
            .setUserDestinationBroadcast("/topic/registry.broadcast")

Based on this paragraph in the documentation section 4.4.13:

In a multi-application server scenario a user destination may remain unresolved because the user is connected to a different server. In such cases you can configure a destination to broadcast unresolved messages to so that other servers have a chance to try. This can be done through the userDestinationBroadcast property of the MessageBrokerRegistry in Java config and the user-destination-broadcast attribute of the message-broker element in XML

I did not see any documentation on "why" /topic/registry.broadcast was the correct "topic" destination, but I am finding various iterations of it:

  1. websocket sessions sample doesn't cluster.. spring-session-1.2.2
  2. What is MultiServerUserRegistry in spring websocket?
  3. Spring websocket - sendToUser from a cluster does not work from backup server

Upvotes: 3

Related Questions