Phoste
Phoste

Reputation: 1209

How can I use Group Timeout with the Java Configuration of Aggregator?

I would like to use the Aggregator Endpoint in SI to aggregate MQTT messages based on the message topic and release the aggregated message when I received all the parts (some gyroscopic values : X, Y and Z) thus far no problem... It works. But I would like to add a group timeout so that when I don't receive all 3 values whithin a period of time the messages are discarded and I can wait for new messages.

My working code :

Configuration :

@SpringBootApplication
public class MqttListenerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MqttListenerApplication.class, args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel filterOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel aggregatorOutputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MqttPahoClientFactory clientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("demo:application");
        options.setPassword("PwdApps".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://mqtt2.thingsplay.com:1883", "test-007", clientFactory(),"#");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
}

Filter Endpoint :

@MessageEndpoint
public class MqttFilter {

    @Filter(
            inputChannel = "mqttInputChannel",
            outputChannel = "filterOutputChannel"
    )
    public boolean isValid(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
        if (topic.contains("testbw")) {
            System.out.println("------ Valid Message ! ------");
            return true;
        } else {
            return false;
        }
    }

}

Aggregator Endpoint :

@MessageEndpoint
public class GyroAggregator {

    private static final Logger logger = LogManager.getLogger();

    @Aggregator(
            inputChannel = "filterOutputChannel",
            outputChannel = "aggregatorOutputChannel"
    )
    public GyroCompleted aggregate(List<Message<?>> messages) {
        GyroCompleted gyroCompleted = new GyroCompleted();
        for (Message<?> message : messages) {
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            if (topic.contains("ACCX")) {
                gyroCompleted.setAcc_x(Integer.valueOf((String) message.getPayload()));
            } else if (topic.contains("ACCY")) {
                gyroCompleted.setAcc_y(Integer.valueOf((String) message.getPayload()));
            } else if (topic.contains("ACCZ")) {
                gyroCompleted.setAcc_z(Integer.valueOf((String) message.getPayload()));
            }
        }
        return gyroCompleted;
    }

    @ReleaseStrategy
    public boolean hasAllAxes(List<Message<?>> messages) {
        logger.debug("In Release Strategy method.");
        logger.debug(messages);
        boolean x = false, y = false, z = false;
        for (Message<?> message : messages) {
            String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
            if (topic.contains("ACCX")) {
                x = true;
            } else if (topic.contains("ACCY")) {
                y = true;
            } else if (topic.contains("ACCZ")) {
                z = true;
            }
        }
        logger.debug("Release Strategy method returning {}", x && y && z);
        return x && y && z;
    }

    @CorrelationStrategy
    public String correlateBy(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
        logger.debug("In Correlation Strategy method.");
        String deviceId = topic.substring(0, topic.indexOf("/"));
        logger.debug("Correlation Strategy returning Key : {}", deviceId);
        return deviceId;
    }

}

Echo Endpoint :

@MessageEndpoint
public class EchoServiceActivator {

    private static final Logger logger = LogManager.getLogger();

    @ServiceActivator(
            inputChannel = "aggregatorOutputChannel"
    )
    public void echo(Message<?> message) {
        logger.debug("Echo : " + message);
    }

}

But for the group timeout point, I can't make it work... There is no configuration through the annotation despite the doc is saying this :

All of the configuration options provided by the xml element are also available for the @Aggregator annotation.

But a few lines below it's saying this :

Annotation configuration (@Aggregator and others) for the Aggregator component covers only simple use cases, where most default options are sufficient. If you need more control over those options using Annotation configuration, consider using a @Bean definition for the AggregatingMessageHandler and mark its @Bean method with @ServiceActivator

The problem is that I can't make that @Bean work...

AggregatingMessageHandler

I tried to put it in a class annotated with @MessageEndpoint but it won't work either. I thought that it would autowire all the component for the aggregator.

How can I make it work ?

Upvotes: 0

Views: 406

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

It's much easier to use the Java DSL. Something like:

@Bean
public IntegrationFlow aggregatorFlow(GyroAggregator agg) {
    return IntegrationFlows.from("filterOutputChannel")
            .aggregate(a -> a
                            .processor(agg)
                            .groupTimeout(500L))
            .channel("aggregatorOutputChannel")
            .get();
}

Of course, you can wire the MQTT adapter and filter into the same flow.

If you want to define the handler as a @Bean use new SimpleMessageStore() in the ctor.

Upvotes: 1

Related Questions