Reputation: 195
I have a Python TCP Socket server service which:
On the other side, I have a Java Spring Boot client application using Spring Integration. My actual TCP Socket configurator implementation uses:
@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
Future<Response> send(Request request);
}
@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
return gateway;
}
@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {
Host host = getHost(config);
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
factory.setSingleUse(false);
factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));
SerializerDeserializer sd = new SerializerDeserializer();
factory.setDeserializer(sd);
factory.setSerializer(sd);
return factory;
}
This actual approach works fine, however, when a request is sent it hangs the connection until a response is received. This is a problem due the fact that some times a request can get too much time to receive a response and the system has other requests incomming whose response can be achieved faster. I would like to send and receive as much as possible requests and responses independetly (decoupled between them). The object transported (serialized and deserialized) contains a key pair that can do the correct correlation.
TL;DR: How to implement an Asynchronous requests/responses over the same connection?
The Spring TcpOutboundGateway javadoc mentions: Use a pair of outbound/inbound adapters for that use case.
So, in addition to the declaration above:
1st Attempt
@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
return gateway;
}
@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
Host host = getHost(config);
AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
connectionFactory.setSingleUse(true);
connectionFactory.setSoTimeout(timeout);
return connectionFactory;
}
The requests are blocked until a response is delivered as before.
2nd Attempt
@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setConnectionFactory(connectionFactory);
gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
gateway.setClientMode(true);
return gateway;
}
org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory may only be used by one inbound adapter
Any clue?
Upvotes: 2
Views: 3636
Reputation: 195
Gary, thanks for your guidance.
To solve this issue is important to first understand Messaging Channel types.
So, in the configurer class:
@Bean(name = REQUEST_CHANNEL)
public DirectChannel sender() {
return new DirectChannel();
}
@Bean(name = RESPONSE_CHANNEL)
public PollableChannel receiver() {
return new QueueChannel();
}
@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public TcpSendingMessageHandler outboundClient(TcpNioClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
outbound.setConnectionFactory(connectionFactory);
outbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
outbound.setClientMode(true);
return outbound;
}
@Bean
public TcpReceivingChannelAdapter inboundClient(TcpNioClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
inbound.setConnectionFactory(connectionFactory);
inbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
inbound.setOutputChannel(receiver());
inbound.setClientMode(true);
return inbound;
}
This scratch @Singleton class illustrates how to operate the requests and responses (considering that requests and responses contains a UID to correlate them):
@Autowired
private DirectChannel sender;
@Autowired
private PollableChannel receiver;
private BlockingQueue<Request> requestPool = new LinkedBlockingQueue<>();
private Map<String, Response> responsePool = Collections.synchronizedMap(new HashMap<>());
@PostConstruct
private void init() {
new Receiver().start();
new Sender().start();
}
/*
* It can be called as many as necessary without hanging for a response
*/
public void send(Request req) {
requestPool.add(req);
}
/*
* Check for a response until a socket timout
*/
public Response receive(String key) {
Response res = responsePool.get(key);
if (res != null) {
responsePool.remove(key);
}
return res;
}
private class Receiver extends Thread {
@Override
public void run() {
while (true) {
try {
tcpReceive();
Thread.sleep(250);
} catch (InterruptedException e) { }
}
}
private void tcpReceive() {
Response res = (Message<Response>) receiver.receive();
if (res != null) {
responsePool.put(res.getUID(), res);
}
}
}
private class Sender extends Thread {
@Override
public void run() {
while (true) {
try {
tcpSend();
Thread.sleep(250);
} catch (InterruptedException e) { }
}
}
private void tcpSend() {
Request req = requestPool.poll(125, TimeUnit.MILLISECONDS);
if (req != null) {
sender.send(MessageBuilder.withPayload(req).build());
}
}
}
UPDATED
I forgot to mention this:
@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(Config config) {
// Get host properties
Host host = getHost(config);
// Create socket factory
TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
factory.setSingleUse(false); // IMPORTANT FOR SINGLE CHANNEL
factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));
return factory;
}
Feel free to make any considerations.
Upvotes: 1
Reputation: 174759
Use a pair of channel adapters instead of an outbound gateway. Instead of using a MessagingGateway
, you can do the correlation yourself in your application, or you can use the same technique as is used in the tcp-client-server-multiplex sample app. It uses an aggregator to aggregate a copy of the outbound message with an inbound message, replying to the gateway.
It's old, and uses XML configuration, but the same techniques apply.
<publish-subscribe-channel id="input" />
<ip:tcp-outbound-channel-adapter id="outAdapter.client"
order="2"
channel="input"
connection-factory="client" /> <!-- Collaborator -->
<!-- Also send a copy to the custom aggregator for correlation and
so this message's replyChannel will be transferred to the
aggregated message.
The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
order="1"/>
<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
channel="toAggregator.client"
connection-factory="client" /> <!-- Collaborator -->
<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />
<aggregator input-channel="toAggregator.client"
output-channel="toTransformer.client"
expire-groups-upon-completion="true"
expire-groups-upon-timeout="true"
discard-channel="noResponseChannel"
group-timeout="1000"
correlation-strategy-expression="payload.substring(0,3)"
release-strategy-expression="size() == 2" />
<channel id="noResponseChannel" />
<service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />
<transformer input-channel="toTransformer.client"
expression="payload.get(1)"/> <!-- The response is always second -->
(This simple sample correlates on the first 3 bytes).
Upvotes: 1