sa3ad
sa3ad

Reputation: 1

TCP Spring Integration - Dispatcher has no subscribers for 'response' channel

I am using spring integration to make TCP call to server by providing some message and getting response back. I prefer to use channel adapter to send and receive bulk messages. The problem I am facing is with the response channel. Getting "Dispatcher has no subscribers for channel " for response channel. Everything is working fine except the response not getting transported on response channel. I can see the handshaking at server and the response in the log being put on the response and logger channels. But after that exception is thrown. Configuration setup is:

<gateway id="clientPositionsGateway" service-interface="MyGatewayInterface">
        <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" />  
    </gateway>


    <channel id="clientPositionsRequestChannel" />

    <splitter input-channel="clientPositionsRequestChannel"
            output-channel="singleClientPositionsRequestChannel" />

    <channel id = "singleClientPositionsRequestChannel" />

    <transformer
        input-channel="singleClientPositionsRequestChannel"
        output-channel="dmQueryRequestChannel"
        ref="dmPosBaseQueryTransformer" />

    <channel id = "dmQueryRequestChannel">
        <!-- <dispatcher task-executor="executor"/> -->
    </channel>

    <ip:tcp-connection-factory id="csClient"
           type="client"
           host="somehost"
           port="12345"
           single-use="true"
           deserializer="connectionSerializeDeserialize"
            />

    <ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
            channel="dmQueryRequestChannel"
            connection-factory="csClient"
            order="2"
            />

    <ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
            channel="dmQueryResponseChannel"
            connection-factory="csClient"
            error-channel="errorChannel"/>

<channel id="dmQueryResponseChannel"/>

Upvotes: 0

Views: 7697

Answers (6)

sa3ad
sa3ad

Reputation: 1

Configuration settings:

<gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway">
    <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel"  reply-channel="dmQueryResponseChannel"/>  
</gateway>

<channel id="clientPositionsRequestChannel" />

<splitter input-channel="clientPositionsRequestChannel"
        output-channel="singleClientPositionsRequestChannel" />

<channel id = "singleClientPositionsRequestChannel" />

<transformer
    input-channel="singleClientPositionsRequestChannel"
    output-channel="dmQueryRequestChannel"
    ref="dmPosBaseQueryTransformer" />

<logging-channel-adapter channel="clientBytes2StringChannel"/>  

<channel id = "dmQueryRequestChannel">
     <dispatcher task-executor="executor"/> 
</channel>

<ip:tcp-connection-factory id="csClient" 
       type="client" 
       host="serverHost"
       port="22010"
       single-use="true"
       deserializer="connectionSerializeDeserialize"
       />

<ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
        channel="dmQueryRequestChannel" 
        connection-factory="csClient"
        /> 

<ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
        channel="dmQueryResponseChannel" 
        connection-factory="csClient" 
        error-channel="errorChannel"/>

<transformer input-channel="dmQueryResponseChannel" output-channel="clientBytes2StringChannel" ref="dmPOSBaseQueryResponseTransformer"
        />

<channel id="dmQueryResponseChannel"/>

<channel id="clientBytes2StringChannel"/>

Upvotes: 0

sa3ad
sa3ad

Reputation: 1

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns="http://www.springframework.org/schema/integration" 
    xmlns:ip="http://www.springframework.org/schema/integration/ip"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <!-- intercept and log every message -->

    <logging-channel-adapter id="logger" level="DEBUG" />
    <wire-tap channel = "logger" />

    <gateway id="clientPositionsGateway"
         service-interface="com.example.ClientPositionsGateway">
        <method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="dmQueryResponseChannel"/>
    </gateway>

    <channel id="clientPositionsRequestChannel" />

    <splitter input-channel="clientPositionsRequestChannel"
            output-channel="singleClientPositionsRequestChannel" />

    <channel id = "singleClientPositionsRequestChannel" />

    <transformer
        input-channel="singleClientPositionsRequestChannel"
        output-channel="dmQueryRequestChannel"
        ref="dmPosBaseTransQueryTransformer" />

    <channel id = "dmQueryRequestChannel">
        <dispatcher task-executor="executor"/>
    </channel>

    <ip:tcp-connection-factory id="csClient" 
           type="client" 
           host="hostserver"
           port="22010"
           single-use="true"
           deserializer="connectionSerializeDeserialize"
           />

    <ip:tcp-outbound-gateway id="dmServerGateway" 
           request-channel="dmQueryRequestChannel" 
           reply-channel="dmQueryResponseChannel"
           connection-factory="csClient"  />

    <channel id="dmQueryResponseChannel">
        <dispatcher task-executor="executor"/>
    </channel>

    <channel id="serverBytes2StringChannel" />

    <bean id="connectionSerializeDeserialize" class="com.example.DMQueryResponseSerializer"/>
    <bean id="dmPosBaseTransQueryTransformer" class="com.example.DMPOSBaseTransQueryTransformer"/> 
    <task:executor id="executor" pool-size="5"/>

</beans:beans>

Upvotes: 0

sa3ad
sa3ad

Reputation: 1

Here is a code that solved my Problem :

    @ContextConfiguration(locations={"/clientGIM2Position.xml"})
    @RunWith(SpringJUnit4ClassRunner.class)

    public class GetClientPositionsTest {

    @Autowired

    ClientPositionsGateway clientPositionsGateway;

    @Test

    public void testGetPositions() throws Exception {

    String positions = clientPositionsGateway.fetchClientPositions(clientList);

    System.out.println("returned !!!!" + positions);

           }
    }

Upvotes: 0

sa3ad
sa3ad

Reputation: 1

public interface ClientPositionsGateway {

    String fetchClientPositions(List<String> clientList); 

}

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121177

If your clientPositionsGateway is invoked from client threads, there is no reason to use executor channels. If you do million loop clientPositionsGateway try to use Future gateway: http://docs.spring.io/spring-integration/docs/2.2.5.RELEASE/reference/html/messaging-endpoints-chapter.html#async-gateway and again: without executor channels. And I don't see reason to use reply-channel on both gateways. And one more: you have <splitter> before <tcp:outbound-gateway>, but where is an <aggregator> after <tcp:outbound-gateway>?.. In your current case you get reply from your clientPositionsGateway, all others will be dropped, because TemporaryReplyChannel will be already closed.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174494

As Artem said in his comment, 'Dispatcher has no subscribers' means here that there is no endpoint configured to receive the response on dmQueryResponseChannel, or the endpoint configured with that channel as its input channel is not started.

In any case, even when you resolve that, using independent adapters for request/response scenarios is tricky because the framework has no way to automatically correlated the response to the request. That's what the outbound gateway is for. You can use collaborating adapters, but you have to deal with the correlation yourself. If you are using a request/reply gateway to initiate the flow, you will have to use a technique such as the one explored in the tcp-client-server-multiplex sample. This is because using independent adapters means you'll lose the replyChannel header used to get the response back to the gateway.

Or, you can use a void returning gateway to send the request, and an <int:outbound-channel-adapter/> so the framework will call back with the response and you can do your own correlation programmatically.

Upvotes: 1

Related Questions