Raj
Raj

Reputation: 50

Mqtt Request-response behavior using aggregation in case of multiple containers

I have implemented request-response pattern on MQTT broker using the spring-integration aggregator pattern. In my use case if I dont receive a response in 2 mins I need to timeout.

    @Bean
    IntegrationFlow mqttHandshakeInFlow(
            ClientManager<IMqttAsyncClient, MqttConnectionOptions> iotClientManager) {
        Mqttv5PahoMessageDrivenChannelAdapter messageDrivenChannelAdapter =
            new Mqttv5PahoMessageDrivenChannelAdapter(iotClientManager, "IOT/responseTopic");
        messageDrivenChannelAdapter.setQos(1);
        return IntegrationFlow.from(messageDrivenChannelAdapter)
                .channel("aggregatorInputChannel")
                .get();
    }

    @Bean
    IntegrationFlow aggregatorFlow() {
        return IntegrationFlow
                    .from(aggregatorInputChannel())
                    .aggregate(a -> 
                            a.correlationStrategy(m -> {
                                String correlationId = (String) m.getHeaders().get("correlationId");
                                System.out.println("CorrelationId: " + correlationId);
                                return correlationId;
                            }).releaseStrategy(r -> r.size() == 2)
                            .expireDuration(Duration.ofMinutes(2))
                            .expireGroupsUponCompletion(true)
                            .expireTimeout(120000))
                            .channel("handShakeResponseChannel")
                    .get();
    }
   
    @Bean
    PollableChannel handShakeResponseChannel() {
       return new QueueChannel();
    }

Below is a controller to test the changes

    @GetMapping
    public String getMethodName(@RequestParam String message) {
        String request = message;
        Message<String> message1 = MessageBuilder.withPayload(message)
        .setHeader("mqtt_correlationData", "someIdxxx")
        .setHeader("correlationId","someIdxxx")
        .setHeader("replyChannel", "handShakeResponseChannel").build();
        aggregatorInputChannel().send(message1);
        mqttGateway.send("IOT/requestTopic", request);
        Message messages = handShakeResponseChannel().receive(60000);
        if(Objects.nonNull(messages)) {
            List<Object> objectResponse = (List<Object>) messages.getPayload();
            System.out.println(objectResponse.size());
            System.out.println(messages.getHeaders().get("correlationId"));
            return "SUCCESS";
        } else {
            return "FAILURE";
        }
    }

On a single container instance the changes are working fine. But in prod we will have 'n' containers. So question is, what will be the behavior?

Assuming I'm not using $share response topic do all the subscribers/containers get the response back and only one container will match the criteria? Will it not overkill the application since the other containers which don't need the message will also have to store it in the messagestore for certain time interval. Is there a better approach?

Also for our implementation we will get a burst of requests (100k) every 30 mins, so any suggestions on which messagestore to use?

Upvotes: 0

Views: 51

Answers (0)

Related Questions