bwillemo
bwillemo

Reputation: 71

Publish & Subscribe with Same Connection using Spring Integration MQTT

Due to the design of MQTT where you can only make a connection with a unique client id, is it possible to use the same connection to publish and subscribe in Spring Framework/Boot using Integration?

Taking this very simple example, it would connect to the MQTT broker to subscribe and get messages, but if you would want to publish a message, the first connection will disconnect and re-connect after the message is sent.

@Bean
public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://localhost:1883");
    factory.setUserName("guest");
    factory.setPassword("guest");
    return factory;
}

// publisher

@Bean
public IntegrationFlow mqttOutFlow() {
    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
                    e -> e.poller(Pollers.fixedDelay(1000)))
            .transform(p -> p + " sent to MQTT")
            .handle(mqttOutbound())
            .get();
}

@Bean
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("siSampleTopic");
    return messageHandler;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ", received from MQTT")
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
            mqttClientFactory(), "siSampleTopic");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

Working with 2 separate connections becomes difficult if you need to wait for an answer/result after publishing a message...

Upvotes: 5

Views: 3071

Answers (2)

iakko
iakko

Reputation: 518

TL;DR

The answer is no, not with the current Spring Boot MQTT Integration implementation (and maybe not even with future ones).

Answer

I'm facing the same exact situation: I need an MQTT Client to be opened in both inbound and outbound, making the connection persistent and sharing the same configuration (client ID, credentials, etc.), using Spring Integration Flows as close to the design as possible.

In order to achieve this, I had to reimplement MqttPahoMessageDrivenChannelAdapter and MqttPahoMessageHandler and a Client Factory.

In both MqttPahoMessageDrivenChannelAdapter and MqttPahoMessageHandler I had to choose to use the Async one (IMqttAsyncClient) in order to fix which one to use. Then I had to review parts of code where the client instance is called/used in order to check if it was already instantiated by the other flow and checking the status (e.g. not trying to connect it if it was already connected).

Regarding the Client Factory, it was easier: I've reimplemented the getAsyncClientInstance(String url, String clientId) using the concatenation of url and clientId as hash as key to store the instance into a map that is used to retrieve the existing instance if the other flow requests it.

It somehow works, but it's just a test and I'm not even sure it's a good approach. (I've started another StackOverflow question in order to track my specific scenario).

Can you share how did you manage your situation?

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174769

the first connection will disconnect and re-connect after the message is sent.

Not sure what you mean by that; both components will keep open a persistent connection.

Since the factory doesn't connect the client, the adapters do, it's not designed for using a shared client.

Using a single connection won't really help with coordination of requests/replies because the reply will still come back asynchronously on another thread.

If you have some data in the request/reply that you can use for correlation of replies to requests, you can use a BarrierMessageHandler to perform that task. See my answer here for an example; it uses the standard correlation id header, but that's not possible with MQTT, you need something in the message.

Upvotes: 1

Related Questions