Petr
Petr

Reputation: 45

Spring integration poller

Good Day! Sorry for my bad English)

I trying examine work with Poller Channel in Spring integration, but take error: When i use DirectChannel (without poller) all is good, but when i use pollerChannel with queue, i take error.

@SpringBootApplication
public class IntegrationSysoutApplication {

    @Bean
    DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    PollableChannel pollerChannel() {
        return new QueueChannel();
    }

/*    @MessagingGateway
    public interface MyGateway {

        @Gateway(requestChannel = "animalFlow.input")
        void process(Animal animal);
    }*/


    @Bean
    public IntegrationFlow animalFlow() {
        return IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))
                .filter((GenericSelector<Animal>) animal -> !animal.getAnimalType().equals("cat"))
                .handle("bService", "process")
                .handle("cService", "process")
                .handle("aService", "process")
                .channel("outputChannel")
                .get();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication
                .run(IntegrationSysoutApplication.class, args);
        DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class);
        outputChannel
                .subscribe(message -> System.out.println("SUPER_MESSAGE: " + message));

        //Отправляем сообщение в gateway
       /* ctx.getBean(MyGateway.class).process(new Animal("lion"));
        ctx.getBean(MyGateway.class).process(new Animal("cat"));
        ctx.getBean(MyGateway.class).process(new Animal("penguin"));*/
       
        //Отправляем сообщение сразу в очередь потока, без gateway
        MessageChannel inputChannel = ctx.getBean("pollerChannel", PollableChannel.class);
        inputChannel.send(MessageBuilder.withPayload(new Animal("penguin")).build());
        inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
        inputChannel.send(MessageBuilder.withPayload(new Animal("dog")).build());

        ctx.close();
    }

}

and in this moment i take error and can`t add poller for queue:

IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))

Error:(47, 32) java: no suitable method found for from(java.lang.String,(x)->x.pol[...]100)))
    method org.springframework.integration.dsl.IntegrationFlows.from(java.lang.String,boolean) is not applicable...      

how i can use Poller for pollingChannel in this situation?

Upvotes: 0

Views: 788

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

The poller configuration goes on the consumer of the PollableChannel - in this case, the

.filter(..., e -> e.poller...)

The consumer "polls" the channel for new messages.

Upvotes: 1

Related Questions