bmarcj
bmarcj

Reputation: 51

Spring Integration DSL - composition with gateway blocks thread

I am trying to learn about how to build IntegrationFlows as units, and join them up.

I set up a very simple processing integration flow:


IntegrationFlow processingFlow = f -> f
                .<String>handle((p, h) -> process(p))
                .log();

        flowContext.registration(processingFlow)
                .id("testProcessing")
                .autoStartup(false)
                .register();

Processing is very simple:

   public String process(String process) {
            return process + " has been processed";
    }

Then I compose a flow from a source, using .gateway() to join the source to the processing:

  MessageChannel beginningChannel = MessageChannels.direct("beginning").get();

        StandardIntegrationFlow composedFlow = IntegrationFlows
                .from(beginningChannel)
                .gateway(processingFlow)
                .log()
                .get();

        flowContext.registration(composedFlow)
                .id("testComposed")
                .autoStartup(false)
                .addBean(processingFlow)
                .register();

Then I start the flow and send a couple of messages:

 composedFlow.start();

 beginningChannel.send(MessageBuilder.withPayload(new String("first string")).build());
 beginningChannel.send(MessageBuilder.withPayload(new String("second string")).build());

The logging handler confirms the handle method has been called for the first message, but the main thread then sits idle, and the second message is never processed.

Is this not the correct way to compose integration flow from building blocks? Doing so with channels requires registering the channel as a bean, and I'm trying to do all this dynamically.

Upvotes: 0

Views: 396

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121442

It must be logAndReply() in the processingFlow. See their JavaDocs for difference. The log() in the end of flow makes it one-way. That’s why you are blocked since gateway waits for reply, but there is no one according your current flow definition. Unfortunately we can’t determine that from the framework level: there might be cases when you indeed don’t return according some routing or filtering logic. The gateway can be configured with a reply timeout. By default it is an infinite.

Upvotes: 1

Related Questions