Reputation: 47
I wrote my first spring integration application which uses the mqtt broker to subscribe messages from different topics which are coming from a device. The device is publishing the messages and the client(Code) is accessing those messages using same topics.
I added a handler for accessing the messages coming from the broker and use it further in classes. Now, in my case, I want to have different handlers for different topics so that they can all be mapped to different VO classes and use it further in business logic.
As I know, I want to create only one connection to the broker, one channel but different topics can come and they should be handled in different handlers for the same connection. How Can I achieve that?
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
// SpringApplicationBuilder springApplicationBuilder = new SpringApplicationBuilder(MqttJavaApplication.class);
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId,"camera/status");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// adapter.setOutputChannelName("mqttInputChannel");
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
public IntegrationFlow mqttInFlow() {
System.out.println(Arrays.stream(SubScribeMessages.class.getMethods()).findFirst());
return IntegrationFlows.from(inbound())
.transform(p -> p)
.handle("addTopics","handlHere")
.get();
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts"); //new topic
adapter.addTopic("camera/+/live_counts"); //new topic
}
return "";
}
// topic "camera/+/counts" is handled here but other messages also come here, how do we handle other topics in separate handlers?
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleHere(@Payload Object mess) throws JsonProcessingException {
String[] topics = adapter.getTopic();
for(String topic:topics)
System.out.println(topic); // How can I get topic name which is using a wildcard?
ObjectMapper objectMapper = new ObjectMapper();
String json=mess.toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
}
Additional Question
How Can I get the full topic name when using a wildcard? The actual topic which was published but caught by wildcard.
Please help.
Upvotes: 0
Views: 1838
Reputation: 47
Thanks Gary! I think the answer you gave on routing can only take the defined topics, not on wildcards or for that matter any other regex. I could not understand how dynamic routing would help me.
Turns out, I can add wildcards when initializing bean and can handle using the service activator on the inputchannel using the adapter.
Like this:
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
SpringApplication.run(MqttJavaApplication.class,args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inbound() {
String clientId = "uuid-" + UUID.randomUUID().toString();
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", clientId, "irisys/V4D-20230143/status" );
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Component
public class MyService{
@Autowired
MqttPahoMessageDrivenChannelAdapter adapter;
@Bean
public String addTopics()
{
if(adapter.getTopic().length>0)
{
adapter.addTopic("camera/+/counts");
}
return "";
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@SneakyThrows
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
System.out.println(message.getHeaders());
if(message.getHeaders().get("mqtt_receivedTopic").toString().contains("counts"))
{
ObjectMapper objectMapper = new ObjectMapper();
String json=message.getPayload().toString();
System.out.println(json);
CountVo countVo = objectMapper.readValue(json, CountVo.class);
if (!countVo.equals(null))
System.out.println(countVo.getIrisysCounts().get(0).getName());
}
}
};
}
}
}
Do you think that there is better way than this? I couldn't think of anything other than this.
Upvotes: 0
Reputation: 174504
Add a router (.route(...)
); you can route on the MqttHeaders.RECEIVED_TOPIC
header (which contains the topic name) to different flows for each topic.
EDIT
The simplest router is to simply map the topic names to channel names. Here is an example:
@SpringBootApplication
public class So67391175Application {
public static void main(String[] args) {
SpringApplication.run(So67391175Application.class, args);
}
@Bean
public DefaultMqttPahoClientFactory pahoClientFactory() {
DefaultMqttPahoClientFactory pahoClientFactory = new DefaultMqttPahoClientFactory();
MqttConnectOptions connectionOptions = new MqttConnectOptions();
connectionOptions.setServerURIs(new String[] { "tcp://localhost:1883" });
pahoClientFactory.setConnectionOptions(connectionOptions);
return pahoClientFactory;
}
@Bean
public IntegrationFlow mqttInFlow(DefaultMqttPahoClientFactory pahoClientFactory) {
return IntegrationFlows.from(
new MqttPahoMessageDrivenChannelAdapter("testClient",
pahoClientFactory, "topic1", "topic2"))
.route("headers['" + MqttHeaders.RECEIVED_TOPIC + "']")
.get();
}
@Bean
public IntegrationFlow flow1() {
return IntegrationFlows.from("topic1")
.handle((payload, headers) -> {
System.out.println("message from topic1 " + payload + ": " + headers);
return null;
})
.get();
}
@Bean
public IntegrationFlow flow2() {
return IntegrationFlows.from("topic2")
.handle((payload, headers) -> {
System.out.println("message from topic2 " + payload + ": " + headers);
return null;
})
.get();
}
}
message from topic1 test: {mqtt_receivedRetained=false, mqtt_id=1, mqtt_duplicate=false, id=1d950bce-aa47-7e3b-1a0d-e4d01ed707de, mqtt_receivedTopic=topic1, mqtt_receivedQos=1, timestamp=1620250633090}
message from topic2 test: {mqtt_receivedRetained=false, mqtt_id=2, mqtt_duplicate=false, id=7e9c3f51-c148-2b18-3588-ed27e93dae19, mqtt_receivedTopic=topic2, mqtt_receivedQos=1, timestamp=1620250644602}
Upvotes: 2