Reputation: 719
Is it possible to use Amazon MQ as external broker for Spring + Websockets + STOMP? I'm trying with no luck. My config is as follows:
@Configuration
@EnableWebSocketMessageBroker
@AllArgsConstructor
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableStompBrokerRelay("/topic")
.setRelayHost("my.amazon.stomp.endpoint").setRelayPort(61614)
.setSystemLogin("xxxxxxxxx").setSystemPasscode("xxxxxxxxx");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-myapp").setAllowedOrigins("*").withSockJS();
}
}
But when running the app, I get this from the logs (DEBUG):
2018-06-13 16:16:42.290 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a] REGISTERED
2018-06-13 16:16:42.291 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a] CONNECT: my.amazon.stomp.endpoint:61614
2018-06-13 16:16:42.398 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : CONNECTED: [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] ACTIVE
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection opened in session=_system_
2018-06-13 16:16:42.404 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] WRITE: 94B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 4f 4e 4e 45 43 54 0a 61 63 63 65 70 74 2d 76 |CONNECT.accept-v|
|00000010| 65 72 73 69 6f 6e 3a 31 2e 31 2c 31 2e 32 0a 6c |ersion:1.1,1.2.l|
|00000020| 6f 67 69 6e 3a 70 61 70 65 72 6c 65 73 73 0a 70 |ogin:xxxxxxxxx.p|
|00000030| 61 73 73 63 6f 64 65 3a 4d 38 7c 42 61 6e 41 47 |asscode:xxxxxxxx|
|00000040| 4c 2d 45 61 0a 68 65 61 72 74 2d 62 65 61 74 3a |xxxx.heart-beat:|
|00000050| 31 30 30 30 30 2c 31 30 30 30 30 0a 0a 00 |10000,10000... |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.405 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] FLUSH
2018-06-13 16:16:42.406 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@37c47287
2018-06-13 16:16:42.512 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 15 03 03 00 02 02 0a |....... |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.513 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.514 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.623 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] INACTIVE
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection to broker closed in session _system_
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : Cleaning up connection state for session _system_
2018-06-13 16:16:42.624 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : CLOSED: [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.625 INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : Failed to connect to reactor.io.net.impl.netty.tcp.NettyTcpClient$ReconnectingChannelListener$3@1ad2dfa9. Attempting reconnect in 5000ms.
2018-06-13 16:16:42.625 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] UNREGISTERED
And it doesn't connect. Any ideas? It works perfectly when connecting to a local ActiveMQ instance, the difference is that locally it uses tcp://host vs Amazon which provides a stomp+ssl://host uri.
Anyway, I'm not including the protocol anyware. It seems that it connects but it doesn't receive any answer to the CONNECT message. I'm setting user and passcode I use to connect to the admin console. I am able to connect via javascript to the wss:// endpoint but I need to set it as external broker for a Spring Boot app.
Upvotes: 2
Views: 2764
Reputation: 719
Finally, the issue was with enabling the SSL protocol. Here's how I did it:
1) I created a custom StompTcpFactory, to create a tcp client with SSL enabled:
/**
* A TCP Client Factory to enable SSL connection
*/
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
private final Environment environment;
private final EventLoopGroup eventLoopGroup;
private final boolean ssl;
private final List<String> addresses;
public StompTcpFactory(List<String> addresses, boolean ssl) {
this.addresses = addresses;
this.ssl = ssl;
this.environment = new Environment(
() -> new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()));
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
}
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
Supplier<InetSocketAddress> supplier = new InetSocketAddressSupplier(addresses);
return tcpClientSpec.env(environment).options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())).ssl(ssl ? new SslOptions() : null)
.connect(supplier);
}
}
Notice the .ssl call to set the flag to use ssl. Also I receive a list of addresses to config the endpoints and a boolean to be able to turn on and off ssl (for dev) .The InetSocketAddressSupplier was created to switch between the 2 addresses that Amazon gives you when working with a HA setup:
/**
* Address supplier for failover connection
*/
@Slf4j
public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {
private static int counter = 0;
private final List<String> addresses;
InetSocketAddressSupplier(List<String> addresses) {
Assert.notNull(addresses, "addresses list cannot be null");
Assert.isTrue(!addresses.isEmpty() && (addresses.size() == 2), "Addresses list must be of size 2");
this.addresses = addresses;
}
@Override
public InetSocketAddress get() {
int serverIndex = counter % 2;
counter++;
String[] info = addresses.get(serverIndex).split(":");
log.debug("Returning server {} {}:{} for connection", serverIndex, info[0], info[1]);
return new InetSocketAddress(info[0], Integer.valueOf(info[1]));
}
}
If you don't need the HA setup, you can simply create an inline supplier as it's shown in Spring docs.
2) You can then configure your websocket endpoint to use this factory to create the client, and also set the user/password for the system and client connections:
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {
@Qualifier("websocket")
private MessageBrokerProperties config;
private InetSocketAddressSupplier supplier;
public WebSocketConfiguration(MessageBrokerProperties config) {
this.config = config;
this.supplier = new InetSocketAddressSupplier(config.getAddresses());
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
InetSocketAddress address = supplier.get();
registry.enableStompBrokerRelay("/topic").setRelayHost(address.getHostName()).setRelayPort(address.getPort())
.setSystemLogin(config.getSystemLogin()).setSystemPasscode(config.getSystemPasscode())
.setClientLogin(config.getClientLogin()).setClientPasscode(config.getClientPasscode())
.setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private TcpOperations<byte[]> createTcpClient() {
return new Reactor2TcpClient<>(new StompTcpFactory(config.getAddresses(), config.isUseSSL()));
}
}
Hope it helps...
Upvotes: 4