Reputation: 302
I'm currently working on a project built with Spring Integration 4.3.14 , and we decided to go to try to use DSL, but I'm having trouble trying to integrate different subflows.
I have the following IntegrationFlow defined:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel"))
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomething(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.channel("nullChannel")
.get();
}
All transform
and handle
invoked methods are non-void and return non-null values. The main reason we went for this approach is to have two different channels to handle errors depending on the part of the flow were they happened, so we can act accordingly.
Yet, when I try to run this code and I insert a record on the DB and the poller picks it up, it never goes beyond the first gateway. I just have this log lines:
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer : Performing transformation.
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer : Performing another transformation.
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
It seems clear that the message does arrive on the first gateway, but apparently it's not being passed to the second gateway.
During startup, I see that SI creates two subFlows (#0 and #1) and two channels for each one (one for each operation, I guess) with 1 subscriber each.
I'd also tried changing the definition to the following:
@Bean
public IntegrationFlow getRecords() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel")
.replyChannel("doThingsChannel"))
.get();
}
@Bean
public IntegrationFlow doThings() {
return IntegrationFlows
.from(
"doThingsChannel")
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomehting(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.get();
}
But eventually got the same problem, both setting the replyChannel
on the GatewayEndpointSpec
or adding an explicit .channel
to getRecords
flow after the gateway.
Upvotes: 1
Views: 227
Reputation: 121282
I've just done this test-case in the Spring Integration Java DSL project:
@Test
public void testGateways() {
IntegrationFlow flow = f -> f
.gateway(sf -> sf
.transform(p -> "foo#" + p)
.transform(p -> "bar#" + p))
.gateway(sf -> sf
.handle((p, h) -> "handle1:" + p)
.handle((p, h) -> "handle2:" + p))
.handle(System.out::println);
IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();
flowRegistration.getInputChannel()
.send(new GenericMessage<>("test"));
flowRegistration.destroy();
}
My output is like this:
GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]
So, both gateways work as expected and all the transformers and handlers are applied. Plus the result of the last gateway is polled to the main flow for the last System.out
step.
Not sure what's going on in your case: only an idea that your .transform(AnotherTransformer::transform)
doesn't return value or anything else happens there.
Regarding a replyChannel
option. It is not where to send a result of the gateway. This is where to wait for the reply to return:
/**
* Specify the channel from which reply messages will be received; overrides the
* encompassing gateway's default reply channel.
* @return the channel name.
*/
String replyChannel() default "";
Upvotes: 1