Marc Tarin
Marc Tarin

Reputation: 3169

Proper syntax to send a message to the control bus channel in an integration flow

Using the reference documentation and this answer, I set up an integration flow with an inbound JMS adapter, that I can stop/start using a control bus:

Control bus:

@Configuration
@RequiredArgsConstructor
public class ControlBus {
    @Bean
    public IntegrationFlow controlBusFlow() {
        return IntegrationFlows.from(controlChannel()).log().controlBus().get();
    }

    @Bean
    public MessageChannel controlChannel() {
        return MessageChannels.direct().get();
    }
}

Integration flow:

@Component
@RequiredArgsConstructor // lombok
public class Admin {
    private final MessageChannel controlChannel;

    @Bean
    public IntegrationFlow adminFlow() {
        return IntegrationFlows
            .from(Http.inboundGateway("/admin")
                      .replyChannel("adminReply")
                      .requestMapping(r -> r.methods(DELETE, POST)))
            .route("headers.http_requestMethod", r -> r
                    .subFlowMapping(DELETE.toString(),
                        f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.stop()"))))
                    .subFlowMapping(POST.toString(),
                        f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.start()"))))
            .channel("adminReply")
            .get();
    }
}

It works fine, the fileInboundAdapter bean (defined in another flow) starts or stops as expected:

2022-10-26 17:31:01.356  INFO 11200 --- [nio-8082-exec-1] o.s.integration.handler.LoggingHandler   : GenericMessage [[email protected](), headers={id=0b273024-8ca9-e383-bcfc-c487f14c044b, timestamp=1666798261355}]
2022-10-26 17:31:01.358  INFO 11200 --- [nio-8082-exec-1] o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'fileInboundAdapter'

But I'm curious if I can change the syntax of the router to something like :

.route("headers.http_requestMethod",
       r -> r.subFlowMapping(DELETE.toString(),
                             f -> f.handle((p, h) -> "@fileInboundAdapter.stop()")
                                   .channel("controlChannel"))
                    .subFlowMapping(POST.toString(),
                             f -> f.handle((p, h) -> "@fileInboundAdapter.start()")
                                   .channel("controlChannel"))

I've tried using the "controlChannel" name or the injected channel, to no avail, the message is sometimes passed, sometimes not, I guess I haven't properly understood the use of the channel method.

If I use the message channel name, "controlChannel", here's what I get in the log (I need 3 attempts before the bean is stopped)

2022-10-26 17:36:16.814  INFO 14768 --- [nio-8082-exec-2] o.s.integration.handler.LoggingHandler   : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ea2cdcc, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@1ea2cdcc, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=909e81d5-490b-f6aa-9e59-0dc5807091c4, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798576813}]
2022-10-26 17:36:48.330  INFO 14768 --- [nio-8082-exec-3] o.s.integration.handler.LoggingHandler   : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d68b976, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4d68b976, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=d7608dc3-809d-7160-398d-f8e329d28e22, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798608330}]
2022-10-26 17:36:54.957  INFO 14768 --- [nio-8082-exec-4] o.s.integration.handler.LoggingHandler   : GenericMessage [[email protected](), headers={content-length=337, http_requestMethod=DELETE, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7037fa6a, accept=*/*, authorization=Basic VXNlcjE6cGFzc3dvcmQ=, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@7037fa6a, host=localhost:8082, http_requestUrl=http://localhost:8082/ldd-controller/admin, connection=keep-alive, id=e16b32f9-42cb-ef06-41ac-837efb8de9d9, http_userPrincipal=UsernamePasswordAuthenticationToken [Principal=User1, Credentials=[PROTECTED], Authenticated=true, Details=WebAuthenticationDetails [RemoteIpAddress=0:0:0:0:0:0:0:1, SessionId=null], Granted Authorities=[Administrateur]], contentType=application/json;charset=UTF-8, accept-encoding=gzip, deflate, br, user-agent=PostmanRuntime/7.29.2, timestamp=1666798614957}]
2022-10-26 17:36:54.960  INFO 14768 --- [nio-8082-exec-4] o.s.i.e.SourcePollingChannelAdapter      : stopped bean 'fileInboundAdapter'

On the third attempt (the one that works), I also get a "No reply received within timeout" response on the http client that send the original request. I also need 3 requests if I want to start the bean again.

I get exactly the same result when injecting the channel.

Upvotes: 0

Views: 66

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121427

With a configuration like f -> f.handle((p, h) -> controlChannel.send(new GenericMessage<>("@fileInboundAdapter.start()"))) you got a reply from this handler as a true - result of the controlChannel.send() call.

With the .channel("controlChannel") you don't get a reply because a call fileInboundAdapter.stop() returns void.

Not sure why you don't get steady results with that router, but I don't think that you are intended for a reply from there anyway.

I'd suggest to revise your configuration with a publishSubscribeChannel(Consumer<PublishSubscribeSpec> publishSubscribeChannelConfigurer) where one of the subscriber sub-flows would go to your router. And another would answer back to the adminReply. Although we need to understand what really you'd like to reply to that HTTP call:

      .publishSubscribeChannel(c -> c
          .subscribe(sf -> sf
                .route("headers.http_requestMethod",
                  r -> r.subFlowMapping(DELETE.toString(),
                         f -> f.handle((p, h) -> "@fileInboundAdapter.stop()")
                               .channel("controlChannel"))
                      .subFlowMapping(POST.toString(),
                         f -> f.handle((p, h) -> "@fileInboundAdapter.start()")
                               .channel("controlChannel")))
        )
      .channel("adminReply")

However it looks more like a plain handle() instead since you do not route to different channels:

 .handle((p, h) -> DELETE.toString().equals(h.get("http_requestMethod") ?  "@fileInboundAdapter.stop()" : "@fileInboundAdapter.start()"))
 .channel("controlChannel")

but still it has to be a part of publishSubscribe sub-flow since you are not going to get a reply from that control bus call.

Upvotes: 1

Related Questions