Paul
Paul

Reputation: 213

Reuse of service-activator for several gateway methods with Splitter

does somebody happen to know if it is valid to reuse a service activator and so also the output-channel using several methods (inbound) especially with a splitter and aggregator.

--> Always with result on the gateway.

In several tests it seems to work fine. As soon I added a splitter with an aggregator I get wrongs result routed to the gateway which then fails with a conversion exception (here in my case it cannot convert boolean to integer).

Thanks,

Paul

Flow

Spring Integration Flow

Spring Integration Config

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd">

    <int:gateway service-interface="activity.BulbMessageGateway">
        <int:method name="sendToBulb" request-channel="bulbMessages" reply-channel="bulbSendResult"></int:method>
        <int:method name="updateHomeLightStatus" request-channel="homeBulbEntity" reply-channel="homeBulbEntityResult">
        </int:method>
        <int:method name="updateLightStatus" request-channel="bulbEntities" reply-channel="bulbSendResult">
            <int:header name="homeId" expression="#args[0].homeId"/>
            <int:header name="bulbId" expression="#args[0].strongId"/>
        </int:method>
    </int:gateway>

    <int:channel id="bulbMessages" />
    <int:channel id="bulbSendResult" />
    <int:channel id="bulbEntities" />

    <int:channel id="homeBulbEntity" />
    <int:channel id="homeBulbEntityResult" />

    <int:chain input-channel="homeBulbEntity" output-channel="bulbEntities">
        <int:splitter expression="payload.bulbs" />
        <int:header-enricher>
            <int:header name="bulbId" expression="payload.strongId"/>
            <int:header name="homeId" expression="payload.homeId"/>
        </int:header-enricher>
    </int:chain>

    <int:transformer method="bulbToLightStatus" input-channel="bulbEntities" output-channel="bulbMessages">
        <bean class="util.BulbTransformer"></bean>
    </int:transformer>

    <int:aggregator input-channel="bulbSendResult" output-channel="homeBulbEntityResult" method="aggregate">
        <bean class="util.BooleanAggregator" />
    </int:aggregator>

    <int:service-activator input-channel="bulbMessages" output-channel="bulbSendResult" method="send">
        <bean class="activity.BulbWebsocketMessageSenderBA" />
    </int:service-activator>
</beans>

Unit test

@Test
public void sendMessageNoReceiver() {
    assertFalse(gateway.sendToBulb(new HomeId("1"), new BulbId("1"), BulbMessageBuilder.restart("foo")));
}

@Test
public void sendMessageWithReceiver() {
    MockSession<BulbId, BulbBE> bulbSession = new MockSession<BulbId, BulbBE>(new BulbBE(HomeId.of("1"), BulbId.of("1"), "bulb", "pass"));
    registry.addBulbSession(bulbSession);
    assertTrue(gateway.sendToBulb(new HomeId("1"), new BulbId("1"), BulbMessageBuilder.restart("foo")));
    assertEquals(1, bulbSession.receivedMessages());
}

@Test
public void updateBulbStatus() {
    final MockSession<BulbId, BulbBE> bulbSession1 = new MockSession<BulbId, BulbBE>(new BulbBE(HomeId.of("1"), BulbId.of("1"), "bulb", "pass"));
    assertFalse(gateway.updateLightStatus(bulbSession1.getIdentity()));

    registry.addBulbSession(bulbSession1);
    assertTrue(gateway.updateLightStatus(bulbSession1.getIdentity()));
    assertEquals(1, bulbSession1.receivedMessages());

    final MockSession<BulbId, BulbBE> bulbSession2 = new MockSession<BulbId, BulbBE>(new BulbBE(HomeId.of("1"), BulbId.of("2"), "bulb", "pass"));
    assertFalse(gateway.updateLightStatus(bulbSession2.getIdentity()));

    registry.addBulbSession(bulbSession2);
    assertTrue(gateway.updateLightStatus(bulbSession2.getIdentity()));
    assertTrue(gateway.updateLightStatus(bulbSession2.getIdentity()));

    assertEquals(2, bulbSession2.receivedMessages());
    assertEquals(1, bulbSession1.receivedMessages());
}
@Test
public void updateHomeBulbStatus() {
    final HomeBE home = new HomeBE();
    home.setId(new ObjectId());

    final MockSession<BulbId, BulbBE> bulbSession1 = new MockSession<BulbId, BulbBE>(new BulbBE(home.getStrongId(), BulbId.of("1"), "bulb", "pass"));
    registry.addBulbSession(bulbSession1);
    final MockSession<BulbId, BulbBE> bulbSession2 = new MockSession<BulbId, BulbBE>(new BulbBE(home.getStrongId(), BulbId.of("2"), "bulb", "pass"));
    registry.addBulbSession(bulbSession2);

    home.addBulb(bulbSession1.getIdentity());

    assertEquals(1, gateway.updateHomeLightStatus(home));
    assertEquals(1, bulbSession1.receivedMessages());
    assertEquals(0, bulbSession2.receivedMessages());

    home.addBulb(bulbSession2.getIdentity());
    assertEquals(2, gateway.updateHomeLightStatus(home));
    assertEquals(2, bulbSession1.receivedMessages());
    assertEquals(1, bulbSession2.receivedMessages());
}

The last test fails if it is executed together with the other tests. It passes if it is executed alone.

The error is that the last method (using the splitter) receives now a boolean, which seems to be a result of the other two methods registered. The result of these methods is a boolean.

Upvotes: 1

Views: 879

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

Please, share config on the matter.

And according your graph it would be better if you'd minimize the config as much as possible to isolate the problem.

From other side, please, be more specific: your question is fully unclear.

That is your own service. How can we be sure that it is safe to be used in different places? Only you, as an author, can determine that.

UPDATE

Sorry for the delay. Was busy with the release.

And thank for sharing the config for your use-case.

Now I see the problem.

You use everywhere on the gateway's methods a reply-channel. See the documentation on the matter when you need that:

Typically you don’t have to specify the default-reply-channel, since a Gateway will auto-create a temporary, anonymous reply channel, where it will listen for the reply. However, there are some cases which may prompt you to define a default-reply-channel (or reply-channel with adapter gateways such as HTTP, JMS, etc.).

Since you use the same bulbSendResult in different places the behavior is really unpredictable. Moreoever that channel is DirectChannel, so the round-robin balancer are on the scene.

You should get rid of those reply-channel's at all and just rely from your downstream components on the replyChannel header. Therefore you should remove those output-channels in the components which are intended to return replies to your gateway.

For example the last service-activator should be just like this:

<int:service-activator input-channel="bulbMessages" method="send">
    <bean class="activity.BulbWebsocketMessageSenderBA"/>
</int:service-activator>

Since you general question how to reuse this service-activator, I'm answering to the question having the config from you:

<int:chain input-channel="homeBulbEntity">
    <int:splitter expression="payload.bulbs"/>
    <int:header-enricher>
        <int:header name="bulbId" expression="payload.strongId"/>
        <int:header name="homeId" expression="payload.homeId"/>
    </int:header-enricher>
    <int:transformer method="bulbToLightStatus">
        <bean class="util.BulbTransformer"/>
    </int:transformer>
    <int:gateway request-channel="bulbMessages"/>
    <int:aggregator method="aggregate">
        <bean class="util.BooleanAggregator"/>
    </int:aggregator>
</int:chain>

Pay attention to the absent output-channel for the <chain>. Therefore it sends reply directly to the replyChannel from headers and as a return to your updateHomeLightStatus gateway's method.

Another trick is that <int:gateway request-channel="bulbMessages"/> which sends messages in the middle of the <chain> flow to your <service-activator> and wait for the reply from there exactly the same way as a top-level gateway - via replyChannel header. For the <service-activator> without an output-channel it is a black-box where to send the reply. It uses just replyChannel from headers!

After receiving the reply gateway in the <chain> push the message to the <aggregator>.

When aggregator will do its logic, the result will be send to the top-level gateway as an output from the <chain>.

That's all.

Let me know what else isn't clear here.

Upvotes: 1

Related Questions