RahulNans
RahulNans

Reputation: 47

How can we have different handlers to subscribe messages from different topics using spring Integration?

I wrote my first spring integration application which uses the mqtt broker to subscribe messages from different topics which are coming from a device. The device is publishing the messages and the client(Code) is accessing those messages using same topics.

I added a handler for accessing the messages coming from the broker and use it further in classes. Now, in my case, I want to have different handlers for different topics so that they can all be mapped to different VO classes and use it further in business logic.

As I know, I want to create only one connection to the broker, one channel but different topics can come and they should be handled in different handlers for the same connection. How Can I achieve that?

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
//        SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
        SpringApplication.run(MqttJavaApplication.class,args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        String clientId = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
//        adapter.setOutputChannelName("mqttInputChannel");
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    public IntegrationFlow mqttInFlow() {
        System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
        return IntegrationFlows.from(inbound())
                .transform(p -> p)
                .handle("addTopics","handlHere")
                .get();
    }

    @Component
    public class MyService{

        @Autowired
        MqttPahoMessageDrivenChannelAdapter adapter;

        @Bean
        public String addTopics()
        {
            if(adapter.getTopic().length>0)
            {
                adapter.addTopic("camera/+/counts"); //new topic 
                adapter.addTopic("camera/+/live_counts"); //new topic
            }
            return "";
        }

        // topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public void handleHere(@Payload Object mess) throws JsonProcessingException {
            String[]  topics = adapter.getTopic();
            for(String topic:topics)
                System.out.println(topic); // How can I get topic name which is using a wildcard?
            ObjectMapper objectMapper = new ObjectMapper();
            String json=mess.toString();
            System.out.println(json);
            CountVo countVo = objectMapper.readValue(json, CountVo.class);
            if (!countVo.equals(null))
                System.out.println(countVo.getIrisysCounts().get(0).getName());

        }
    }

}

Additional Question

How Can I get the full topic name when using a wildcard? The actual topic which was published but caught by wildcard.

Please help.

Upvotes: 0

Views: 1838

Answers (2)

RahulNans
RahulNans

Reputation: 47

Thanks Gary! I think the answer you gave on routing can only take the defined topics, not on wildcards or for that matter any other regex. I could not understand how dynamic routing would help me.

Turns out, I can add wildcards when initializing bean and can handle using the service activator on the inputchannel using the adapter.

Like this:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqttJavaApplication.class,args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoMessageDrivenChannelAdapter inbound() {
        String clientId = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId, "irisys/V4D-20230143/status"   );
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Component
    public class MyService{

        @Autowired
        MqttPahoMessageDrivenChannelAdapter adapter;

        @Bean
        public String addTopics()
        {
            if(adapter.getTopic().length>0)
            {
                adapter.addTopic("camera/+/counts");
            }
            return "";
        }

        @Bean
        @ServiceActivator(inputChannel = "mqttInputChannel")
        public MessageHandler handler() {
            return new MessageHandler() {

                @SneakyThrows
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    System.out.println(message.getPayload());
                    System.out.println(message.getHeaders());
                    if(message.getHeaders().get("mqtt_receivedTopic").toString().contains("counts"))
                    {
                        ObjectMapper objectMapper = new ObjectMapper();
                        String json=message.getPayload().toString();
                        System.out.println(json);
                        CountVo countVo = objectMapper.readValue(json, CountVo.class);
                        if (!countVo.equals(null))
                            System.out.println(countVo.getIrisysCounts().get(0).getName());
                    }

                }

            };
        }
    }
}

Do you think that there is better way than this? I couldn't think of anything other than this.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174504

Add a router (.route(...)); you can route on the MqttHeaders.RECEIVED_TOPIC header (which contains the topic name) to different flows for each topic.

https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#messaging-routing-chapter

EDIT

The simplest router is to simply map the topic names to channel names. Here is an example:

@SpringBootApplication
public class So67391175Application {

    public static void main(String[] args) {
        SpringApplication.run(So67391175Application.class, args);
    }

    @Bean
    public DefaultMqttPahoClientFactory pahoClientFactory() {
        DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions connectionOptions = new MqttConnectOptions();
        connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
        pahoClientFactory.setConnectionOptions(connectionOptions);
        return pahoClientFactory;
    }

    @Bean
    public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
        return IntegrationFlows.from(
                new MqttPahoMessageDrivenChannelAdapter("testClient",
                        pahoClientFactory, "topic1", "topic2"))
                .route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
                .get();
    }

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlows.from("topic1")
                .handle((payload, headers) -> {
                    System.out.println("message from topic1 " + payload + ": " + headers);
                    return null;
                })
                .get();
    }

    @Bean
    public IntegrationFlow flow2() {
        return IntegrationFlows.from("topic2")
                .handle((payload, headers) -> {
                    System.out.println("message from topic2 " + payload + ": " + headers);
                    return null;
                })
                .get();
    }

}

message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}

message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}

Upvotes: 2

Related Questions