Reputation: 3169
Using the reference documentation and this answer, I set up an integration flow with an inbound JMS adapter, that I can stop/start using a control bus:
Control bus:
@Configuration
@RequiredArgsConstructor
public class ControlBus {
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from(controlChannel()).log().controlBus().get();
}
@Bean
public MessageChannel controlChannel() {
return MessageChannels.direct().get();
}
}
Integration flow:
@Component
@RequiredArgsConstructor // lombok
public class Admin {
private final MessageChannel controlChannel;
@Bean
public IntegrationFlow adminFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/admin")
.replyChannel("adminReply")
.requestMapping(r -> r.methods(DELETE, POST)))
.route("headers.http_requestMethod", r -> r
.subFlowMapping(DELETE.toString(),
f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.stop()"))))
.subFlowMapping(POST.toString(),
f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.start()"))))
.channel("adminReply")
.get();
}
}
It works fine, the fileInboundAdapter bean (defined in another flow) starts or stops as expected:
2022-10-26 17:31:01.356 INFO 11200 --- [nio-8082-exec-1] o.s.integration.handler.LoggingHandler : GenericMessage [[email protected](), headers={id=0b273024-8ca9-e383-bcfc-c487f14c044b, timestamp=1666798261355}]
2022-10-26 17:31:01.358 INFO 11200 --- [nio-8082-exec-1] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'fileInboundAdapter'
But I'm curious if I can change the syntax of the router to something like :
.route("headers.http_requestMethod",
r -> r.subFlowMapping(DELETE.toString(),
f -> f.handle((p, h) -> "@fileInboundAdapter.stop()")
.channel("controlChannel"))
.subFlowMapping(POST.toString(),
f -> f.handle((p, h) -> "@fileInboundAdapter.start()")
.channel("controlChannel"))
I've tried using the "controlChannel" name or the injected channel, to no avail, the message is sometimes passed, sometimes not, I guess I haven't properly understood the use of the channel method.
If I use the message channel name, "controlChannel", here's what I get in the log (I need 3 attempts before the bean is stopped)
2022-10-26 17:36:16.814 INFO 14768 --- [nio-8082-exec-2] o.s.integration.handler.LoggingHandler : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ea2cdcc, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ea2cdcc, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=909e81d5-490b-f6aa-9e59-0dc5807091c4, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798576813}]
2022-10-26 17:36:48.330 INFO 14768 --- [nio-8082-exec-3] o.s.integration.handler.LoggingHandler : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d68b976, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d68b976, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=d7608dc3-809d-7160-398d-f8e329d28e22, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798608330}]
2022-10-26 17:36:54.957 INFO 14768 --- [nio-8082-exec-4] o.s.integration.handler.LoggingHandler : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7037fa6a, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7037fa6a, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=e16b32f9-42cb-ef06-41ac-837efb8de9d9, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798614957}]
2022-10-26 17:36:54.960 INFO 14768 --- [nio-8082-exec-4] o.s.i.e.SourcePollingChannelAdapter : stopped bean 'fileInboundAdapter'
On the third attempt (the one that works), I also get a "No reply received within timeout" response on the http client that send the original request. I also need 3 requests if I want to start the bean again.
I get exactly the same result when injecting the channel.
Upvotes: 0
Views: 66
Reputation: 121427
With a configuration like f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.start()")))
you got a reply from this handler as a true
- result of the controlChannel.send()
call.
With the .channel("controlChannel")
you don't get a reply because a call fileInboundAdapter.stop()
returns void
.
Not sure why you don't get steady results with that router, but I don't think that you are intended for a reply from there anyway.
I'd suggest to revise your configuration with a publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer)
where one of the subscriber sub-flows would go to your router. And another would answer back to the adminReply
. Although we need to understand what really you'd like to reply to that HTTP call:
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf
.route("headers.http_requestMethod",
r -> r.subFlowMapping(DELETE.toString(),
f -> f.handle((p, h) -> "@fileInboundAdapter.stop()")
.channel("controlChannel"))
.subFlowMapping(POST.toString(),
f -> f.handle((p, h) -> "@fileInboundAdapter.start()")
.channel("controlChannel")))
)
.channel("adminReply")
However it looks more like a plain handle()
instead since you do not route to different channels:
.handle((p, h) -> DELETE.toString().equals(h.get("http_requestMethod") ? "@fileInboundAdapter.stop()" : "@fileInboundAdapter.start()"))
.channel("controlChannel")
but still it has to be a part of publishSubscribe sub-flow since you are not going to get a reply from that control bus call.
Upvotes: 1