Reputation: 45
Good Day! Sorry for my bad English)
I trying examine work with Poller Channel in Spring integration, but take error: When i use DirectChannel (without poller) all is good, but when i use pollerChannel with queue, i take error.
@SpringBootApplication
public class IntegrationSysoutApplication {
@Bean
DirectChannel outputChannel() {
return new DirectChannel();
}
@Bean
PollableChannel pollerChannel() {
return new QueueChannel();
}
/* @MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "animalFlow.input")
void process(Animal animal);
}*/
@Bean
public IntegrationFlow animalFlow() {
return IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))
.filter((GenericSelector<Animal>) animal -> !animal.getAnimalType().equals("cat"))
.handle("bService", "process")
.handle("cService", "process")
.handle("aService", "process")
.channel("outputChannel")
.get();
}
public static void main(String[] args) {
ConfigurableApplicationContext ctx = SpringApplication
.run(IntegrationSysoutApplication.class, args);
DirectChannel outputChannel = ctx.getBean("outputChannel", DirectChannel.class);
outputChannel
.subscribe(message -> System.out.println("SUPER_MESSAGE: " + message));
//Отправляем сообщение в gateway
/* ctx.getBean(MyGateway.class).process(new Animal("lion"));
ctx.getBean(MyGateway.class).process(new Animal("cat"));
ctx.getBean(MyGateway.class).process(new Animal("penguin"));*/
//Отправляем сообщение сразу в очередь потока, без gateway
MessageChannel inputChannel = ctx.getBean("pollerChannel", PollableChannel.class);
inputChannel.send(MessageBuilder.withPayload(new Animal("penguin")).build());
inputChannel.send(MessageBuilder.withPayload(new Animal("cat")).build());
inputChannel.send(MessageBuilder.withPayload(new Animal("dog")).build());
ctx.close();
}
}
and in this moment i take error and can`t add poller for queue:
IntegrationFlows.from(pollerChannel(), x -> x.poller(Pollers.fixedRate(100)))
Error:(47, 32) java: no suitable method found for from(java.lang.String,(x)->x.pol[...]100)))
method org.springframework.integration.dsl.IntegrationFlows.from(java.lang.String,boolean) is not applicable...
how i can use Poller for pollingChannel in this situation?
Upvotes: 0
Views: 788
Reputation: 174484
The poller configuration goes on the consumer of the PollableChannel
- in this case, the
.filter(..., e -> e.poller...)
The consumer "polls" the channel for new messages.
Upvotes: 1