Yurii Buhryn
Yurii Buhryn

Reputation: 823

Spring MQTT JAva Config example issue

Where I can find example how to work with MQTT + JAva Config ?

This not working for me: http://docs.spring.io/spring-integration/reference/html/mqtt.html

Upvotes: 3

Views: 4571

Answers (3)

bittap
bittap

Reputation: 548

I am trying that. http://docs.spring.io/spring-integration/reference/html/mqtt.html
It is working well. My source is this

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@SpringBootApplication
public class MqttApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(MqttApplication.class, args);

        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("testMqtt",
            mqttClientFactory(), "testTopic");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174769

I am glad you have found a solution.

I created a sample app that uses the Java DSL to read from stdin, send to MQTT, receive and log.

Here are the pertinent bits:

// publisher

@Bean
public IntegrationFlow mqttOutFlow() {
    return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
                    e -> e.poller(Pollers.fixedDelay(1000)))
            .transform(p -> p + " sent to MQTT")
            .handle(mqttOutbound())
            .get();
}

@Bean
public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("siSampleTopic");
    return messageHandler;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
    return IntegrationFlows.from(mqttInbound())
            .transform(p -> p + ", received from MQTT")
            .handle(logger())
            .get();
}

private LoggingHandler logger() {
    LoggingHandler loggingHandler = new LoggingHandler("INFO");
    loggingHandler.setLoggerName("siSample");
    return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
            mqttClientFactory(), "siSampleTopic");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    return adapter;
}

.

foo
14:40:56.770 [MQTT Call: siSampleConsumer] INFO  siSample - foo sent to MQTT, received from MQTT

EDIT

The official Spring Integration MQTT Sample with Annotations & DSL Configuration is located here: https://github.com/spring-projects/spring-integration-samples/tree/master/basic/mqtt

Upvotes: 3

Yurii Buhryn
Yurii Buhryn

Reputation: 823

Resolved by using Spring Boot

@Configuration
@ComponentScan
@EnableAutoConfiguration
@IntegrationComponentScan
public class Application {

  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }

  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setServerURIs("tcp://url:10423");
    factory.setUserName("username");
    factory.setPassword("password");
    return factory;
  }

  @Bean
  public MessageProducer inbound() {
    MqttPahoMessageDrivenChannelAdapter adapter =
            new MqttPahoMessageDrivenChannelAdapter("testMqtt", mqttClientFactory(),
                    "test");
    adapter.setCompletionTimeout(5000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
  }

  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return new MessageHandler() {

      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("!!!!!!!!!!!!!!!!!!!" + message.getPayload());
      }

    };
  }

  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
            new MqttPahoMessageHandler("testClient", mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic("test");
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }

  @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  public interface MyGateway {

    void sendToMqtt(String data);

  }

}

Upvotes: 5

Related Questions