KJQ
KJQ

Reputation: 707

Spring Integration Usage and Approach Validation

I am testing out using Spring Integration to tie together disperate modules within the same Spring-Boot application, for now, and services into a unified flow starting with a single-entry point.

I am looking for the following clarifications with Spring Integration if possible:

  1. Is the below code the right way to structure flows using the DSL?
  2. In "C" below, can i bubble up the result to the "B" flow?
  3. Is using the DSL vs. the XML the better approach?
  4. I am confused as to how to correctly "terminate" a flow?

Flow Overview

In the code below, I am just publishing a page to a destination. The overall flow goes like this.

  1. Publisher flow listens for the payload and splits it into parts.
  2. Content flow filters out pages and splits them into parts.
    1. AWS flow subscribes and handles the part.
    2. File flow subscribes and handles the part.

Eventually, there may be additional and very different types of consumers to the Publisher flow which are not content which is why I split the publisher from the content.

A) Publish Flow (publisher.jar):

This is my "main" flow initiated through a gateway. The intent, is that this serves as the entry point to begin trigger all publishing flows.

  1. Receive the message
  2. Preprocess the message and save it.
  3. Split the payload into individual entries contained in it.
  4. Enrich each of the entries with the rest of the data
  5. Put each entry on the output channel.

Below is the code:

@Bean
IntegrationFlow flowPublish()
{
    return f -> f
        .channel(this.publishingInputChannel())
        //Prepare the payload
        .<Package>handle((p, h) -> this.save(p))
        //Split the artifact resolved items
        .split(Package.class, Package::getItems)
        //Find the artifact associated to each item (if available)
        .enrich(
            e -> e.<PackageEntry>requestPayload(
                m ->
                {
                    final PackageEntry item = m.getPayload();
                    final Publishable publishable = this.findPublishable(item);
                    item.setPublishable(publishable);
                    return item;
                }))
        //Send the results to the output channel
        .channel(this.publishingOutputChannel());
}

B) Content Flow (content.jar)

This module's responsibility is to handle incoming "content" payloads (i.e. Page in this case) and split/route them to the appropriate subscriber(s).

  1. Listen on the publisher output channel
  2. Filter the entries by Page type only
  3. Add the original payload to the header for later
  4. Transform the payload into the actual type
  5. Split the page into its individual elements (blocks)
  6. Route each element to the appropriate PubSub channel.

At least for now, the subscribed flows do not return any response - they should just fire and forget but i would like to know how to bubble up the result when using the pub-sub channel.

Below is the code:

@Bean
@ContentChannel("asset")
MessageChannel contentAssetChannel()
{
    return MessageChannels.publishSubscribe("assetPublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
@ContentChannel("page")
MessageChannel contentPageChannel()
{
    return MessageChannels.publishSubscribe("pagePublisherChannel").get();

    //return MessageChannels.queue(10).get();
}

@Bean
IntegrationFlow flowPublishContent()
{
    return flow -> flow
        .channel(this.publishingChannel)
        //Filter for root pages (which contain elements)
        .filter(PackageEntry.class, p -> p.getPublishable() instanceof Page)
        //Put the publishable details in the header
        .enrichHeaders(e -> e.headerFunction("item", Message::getPayload))
        //Transform the item to a Page
        .transform(PackageEntry.class, PackageEntry::getPublishable)
        //Split page into components and put the type in the header
        .split(Page.class, this::splitPageElements)
        //Route content based on type to the subscriber
        .<PageContent, String>route(PageContent::getType, mapping -> mapping
            .resolutionRequired(false)
            .subFlowMapping("page", sf -> sf.channel(this.contentPageChannel()))
            .subFlowMapping("image", sf -> sf.channel(this.contentAssetChannel()))
            .defaultOutputToParentFlow())
        .channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
}

C) AWS Content (aws-content.jar)

This module is one of many potential subscribers to the content specific flows. It handles each element individually based off of the routed channel published to above.

  1. Subscribe to the appropriate channel.
  2. Handle the action appropriately.

There can be multiple modules with flows that subscribe to the above routed output channels, this is just one of them.

As an example, the the "contentPageChannel" could invoke the below flowPageToS3 (in aws module) and also a flowPageToFile (in another module).

Below is the code:

@Bean
IntegrationFlow flowAssetToS3()
{
    return flow -> flow
        .channel(this.assetChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<PageContent>handle((p, h) ->
                                     {
                                         return this.publishS3Asset(p);
                                     })));
}

@Bean
IntegrationFlow flowPageToS3()
{
    return flow -> flow
        .channel(this.pageChannel)
        .publishSubscribeChannel(c -> c
            .subscribe(s -> s
                .<Page>handle((p, h) -> this.publishS3Page(p))
                .enrichHeaders(e -> e.header("s3Command", Command.UPLOAD.name()))
                .handle(this.s3MessageHandler())));
}

Upvotes: 2

Views: 875

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121442

First of all there are a lot of content in your question: it's to hard to keep all the info during read. That is your project, so you should be very confident in the subject. But for us that is something new and may just give up even reading not talking already with attempt to answer.

Anyway I'll try to answer to your questions in the beginning, although I feel like you're going to start a long discussion "what?, how?, why?"...

Is the below code the right way to structure flows using the DSL?

It really depends of your logic. That is good idea to distinguish it between logical component, but that might be overhead to sever separate jar on the matter. Looking to your code that seems for me like you still collect everything into single Spring Boot application and just @Autowired appropriate channels to the @Configuration. So, yes, separate @Configuration is good idea, but separate jar is an overhead. IMHO.

In "C" below, can i bubble up the result to the "B" flow?

Well, since the story is about publish-subscribe that is really unusual to wait for reply. How many replies are you going to get from those subscribers? Right, that is the problem - we can send to many subscribers, but we can't get replies from all of them to single return. Let's come back to Java code: we can have several method arguments, but we have only one return. The same is applied here in Messaging. Anyway you may take a look into Scatter-Gather pattern implementation.

Is using the DSL vs. the XML the better approach?

Both are just a high-level API. Underneath there are the same integration components. Looking to your app you'd come to the same distributed solution with the XML configuration. Don't see reason to step back from the Java DSL. At least it is less verbose, for you.

I am confused as to how to correctly "terminate" a flow?

That's absolutely unclear having your big description. If you send to S3 or to File, that is a termination. There is no reply from those components, so no where to go, nothing to do. That is just stop. The same we have with the Java method with void. If you worry about your entry point gateway, so just make it void and don't wait for any replies. See Messaging Gateway for more info.

Upvotes: 2

Related Questions