Reputation: 1
I am using spring integration to make TCP call to server by providing some message and getting response back. I prefer to use channel adapter to send and receive bulk messages. The problem I am facing is with the response channel. Getting "Dispatcher has no subscribers for channel " for response channel. Everything is working fine except the response not getting transported on response channel. I can see the handshaking at server and the response in the log being put on the response and logger channels. But after that exception is thrown. Configuration setup is:
<gateway id="clientPositionsGateway" service-interface="MyGatewayInterface">
<method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" />
</gateway>
<channel id="clientPositionsRequestChannel" />
<splitter input-channel="clientPositionsRequestChannel"
output-channel="singleClientPositionsRequestChannel" />
<channel id = "singleClientPositionsRequestChannel" />
<transformer
input-channel="singleClientPositionsRequestChannel"
output-channel="dmQueryRequestChannel"
ref="dmPosBaseQueryTransformer" />
<channel id = "dmQueryRequestChannel">
<!-- <dispatcher task-executor="executor"/> -->
</channel>
<ip:tcp-connection-factory id="csClient"
type="client"
host="somehost"
port="12345"
single-use="true"
deserializer="connectionSerializeDeserialize"
/>
<ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
channel="dmQueryRequestChannel"
connection-factory="csClient"
order="2"
/>
<ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
channel="dmQueryResponseChannel"
connection-factory="csClient"
error-channel="errorChannel"/>
<channel id="dmQueryResponseChannel"/>
Upvotes: 0
Views: 7697
Reputation: 1
Configuration settings:
<gateway id="clientPositionsGateway" service-interface="com.example.ClientPositionsGateway">
<method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="dmQueryResponseChannel"/>
</gateway>
<channel id="clientPositionsRequestChannel" />
<splitter input-channel="clientPositionsRequestChannel"
output-channel="singleClientPositionsRequestChannel" />
<channel id = "singleClientPositionsRequestChannel" />
<transformer
input-channel="singleClientPositionsRequestChannel"
output-channel="dmQueryRequestChannel"
ref="dmPosBaseQueryTransformer" />
<logging-channel-adapter channel="clientBytes2StringChannel"/>
<channel id = "dmQueryRequestChannel">
<dispatcher task-executor="executor"/>
</channel>
<ip:tcp-connection-factory id="csClient"
type="client"
host="serverHost"
port="22010"
single-use="true"
deserializer="connectionSerializeDeserialize"
/>
<ip:tcp-outbound-channel-adapter id="dmServerOutboundAdapter"
channel="dmQueryRequestChannel"
connection-factory="csClient"
/>
<ip:tcp-inbound-channel-adapter id="dmServerInboundAdapter"
channel="dmQueryResponseChannel"
connection-factory="csClient"
error-channel="errorChannel"/>
<transformer input-channel="dmQueryResponseChannel" output-channel="clientBytes2StringChannel" ref="dmPOSBaseQueryResponseTransformer"
/>
<channel id="dmQueryResponseChannel"/>
<channel id="clientBytes2StringChannel"/>
Upvotes: 0
Reputation: 1
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/integration"
xmlns:ip="http://www.springframework.org/schema/integration/ip"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ip http://www.springframework.org/schema/integration/ip/spring-integration-ip.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<!-- intercept and log every message -->
<logging-channel-adapter id="logger" level="DEBUG" />
<wire-tap channel = "logger" />
<gateway id="clientPositionsGateway"
service-interface="com.example.ClientPositionsGateway">
<method name="fetchClientPositions" request-channel="clientPositionsRequestChannel" reply-channel="dmQueryResponseChannel"/>
</gateway>
<channel id="clientPositionsRequestChannel" />
<splitter input-channel="clientPositionsRequestChannel"
output-channel="singleClientPositionsRequestChannel" />
<channel id = "singleClientPositionsRequestChannel" />
<transformer
input-channel="singleClientPositionsRequestChannel"
output-channel="dmQueryRequestChannel"
ref="dmPosBaseTransQueryTransformer" />
<channel id = "dmQueryRequestChannel">
<dispatcher task-executor="executor"/>
</channel>
<ip:tcp-connection-factory id="csClient"
type="client"
host="hostserver"
port="22010"
single-use="true"
deserializer="connectionSerializeDeserialize"
/>
<ip:tcp-outbound-gateway id="dmServerGateway"
request-channel="dmQueryRequestChannel"
reply-channel="dmQueryResponseChannel"
connection-factory="csClient" />
<channel id="dmQueryResponseChannel">
<dispatcher task-executor="executor"/>
</channel>
<channel id="serverBytes2StringChannel" />
<bean id="connectionSerializeDeserialize" class="com.example.DMQueryResponseSerializer"/>
<bean id="dmPosBaseTransQueryTransformer" class="com.example.DMPOSBaseTransQueryTransformer"/>
<task:executor id="executor" pool-size="5"/>
</beans:beans>
Upvotes: 0
Reputation: 1
Here is a code that solved my Problem :
@ContextConfiguration(locations={"/clientGIM2Position.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class GetClientPositionsTest {
@Autowired
ClientPositionsGateway clientPositionsGateway;
@Test
public void testGetPositions() throws Exception {
String positions = clientPositionsGateway.fetchClientPositions(clientList);
System.out.println("returned !!!!" + positions);
}
}
Upvotes: 0
Reputation: 1
public interface ClientPositionsGateway {
String fetchClientPositions(List<String> clientList);
}
Upvotes: 0
Reputation: 121177
If your clientPositionsGateway is invoked from client threads, there is no reason to use executor channels. If you do million loop clientPositionsGateway try to use Future gateway: http://docs.spring.io/spring-integration/docs/2.2.5.RELEASE/reference/html/messaging-endpoints-chapter.html#async-gateway and again: without executor channels. And I don't see reason to use reply-channel
on both gateways.
And one more: you have <splitter>
before <tcp:outbound-gateway>
, but where is an <aggregator>
after <tcp:outbound-gateway>
?..
In your current case you get reply from your clientPositionsGateway, all others will be dropped, because TemporaryReplyChannel will be already closed.
Upvotes: 0
Reputation: 174494
As Artem said in his comment, 'Dispatcher has no subscribers' means here that there is no endpoint configured to receive the response on dmQueryResponseChannel
, or the endpoint configured with that channel as its input channel is not started.
In any case, even when you resolve that, using independent adapters for request/response scenarios is tricky because the framework has no way to automatically correlated the response to the request. That's what the outbound gateway is for. You can use collaborating adapters, but you have to deal with the correlation yourself. If you are using a request/reply gateway to initiate the flow, you will have to use a technique such as the one explored in the tcp-client-server-multiplex sample. This is because using independent adapters means you'll lose the replyChannel
header used to get the response back to the gateway.
Or, you can use a void returning gateway to send the request, and an <int:outbound-channel-adapter/>
so the framework will call back with the response and you can do your own correlation programmatically.
Upvotes: 1