zeh
zeh

Reputation: 195

Spring Integration - How to implement an asynchronous TCP Socket requests/responses over the same connection?

I have a Python TCP Socket server service which:

On the other side, I have a Java Spring Boot client application using Spring Integration. My actual TCP Socket configurator implementation uses:

@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
    Future<Response> send(Request request);
}

@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {    
    Host host = getHost(config);

    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
    factory.setSingleUse(false);
    factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));

    SerializerDeserializer sd = new SerializerDeserializer();
    factory.setDeserializer(sd);
    factory.setSerializer(sd);
    return factory;
}

This actual approach works fine, however, when a request is sent it hangs the connection until a response is received. This is a problem due the fact that some times a request can get too much time to receive a response and the system has other requests incomming whose response can be achieved faster. I would like to send and receive as much as possible requests and responses independetly (decoupled between them). The object transported (serialized and deserialized) contains a key pair that can do the correct correlation.

TL;DR: How to implement an Asynchronous requests/responses over the same connection?

The Spring TcpOutboundGateway javadoc mentions: Use a pair of outbound/inbound adapters for that use case.

So, in addition to the declaration above:

1st Attempt

@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
    Host host = getHost(config);
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
    connectionFactory.setSingleUse(true);
    connectionFactory.setSoTimeout(timeout);
    return connectionFactory;
}

The requests are blocked until a response is delivered as before.

2nd Attempt

@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setClientMode(true);
    return gateway;
}

org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory may only be used by one inbound adapter

Any clue?

Upvotes: 2

Views: 3636

Answers (2)

zeh
zeh

Reputation: 195

Gary, thanks for your guidance.

To solve this issue is important to first understand Messaging Channel types.

So, in the configurer class:

@Bean(name = REQUEST_CHANNEL)
public DirectChannel sender() {
    return new DirectChannel();
}

@Bean(name = RESPONSE_CHANNEL)
public PollableChannel receiver() {
    return new QueueChannel();
}

@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public TcpSendingMessageHandler outboundClient(TcpNioClientConnectionFactory connectionFactory) {
    TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
    outbound.setConnectionFactory(connectionFactory);
    outbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
    outbound.setClientMode(true);
    return outbound;
}

@Bean
public TcpReceivingChannelAdapter inboundClient(TcpNioClientConnectionFactory connectionFactory) {
    TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
    inbound.setConnectionFactory(connectionFactory);
    inbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
    inbound.setOutputChannel(receiver());
    inbound.setClientMode(true);
    return inbound;
}

This scratch @Singleton class illustrates how to operate the requests and responses (considering that requests and responses contains a UID to correlate them):

@Autowired
private DirectChannel sender;

@Autowired
private PollableChannel receiver;

private BlockingQueue<Request> requestPool = new LinkedBlockingQueue<>();

private Map<String, Response> responsePool = Collections.synchronizedMap(new HashMap<>());

@PostConstruct
private void init() {
    new Receiver().start();
    new Sender().start();
}

/*
 * It can be called as many as necessary without hanging for a response
 */
public void send(Request req) {
    requestPool.add(req);
}

/*
 * Check for a response until a socket timout
 */
public Response receive(String key) {
    Response res = responsePool.get(key);
    if (res != null) {
        responsePool.remove(key);
    }
    return res;
}

private class Receiver extends Thread {
    @Override
    public void run() {
        while (true) {
            try {
                tcpReceive();
                Thread.sleep(250);
            } catch (InterruptedException e) { }
        }
    }
    private void tcpReceive() {
        Response res = (Message<Response>) receiver.receive();
        if (res != null) {
            responsePool.put(res.getUID(), res);
        }
    }
}

private class Sender extends Thread {
    @Override
    public void run() {
        while (true) {
            try {
                tcpSend();
                Thread.sleep(250);
            } catch (InterruptedException e) { }
        }
    }
    private void tcpSend() {
        Request req = requestPool.poll(125, TimeUnit.MILLISECONDS);
        if (req != null) {
            sender.send(MessageBuilder.withPayload(req).build());
        }
    }
}

UPDATED

I forgot to mention this:

@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(Config config) {
    // Get host properties
    Host host = getHost(config);
    // Create socket factory
    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
    factory.setSingleUse(false); // IMPORTANT FOR SINGLE CHANNEL
    factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));
    return factory;
}

Feel free to make any considerations.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174759

Use a pair of channel adapters instead of an outbound gateway. Instead of using a MessagingGateway, you can do the correlation yourself in your application, or you can use the same technique as is used in the tcp-client-server-multiplex sample app. It uses an aggregator to aggregate a copy of the outbound message with an inbound message, replying to the gateway.

It's old, and uses XML configuration, but the same techniques apply.

<publish-subscribe-channel id="input" />

<ip:tcp-outbound-channel-adapter id="outAdapter.client"
    order="2"
    channel="input"
    connection-factory="client" /> <!-- Collaborator -->

<!-- Also send a copy to the custom aggregator for correlation and
     so this message's replyChannel will be transferred to the
     aggregated message.
     The order ensures this gets to the aggregator first -->
<bridge input-channel="input" output-channel="toAggregator.client"
        order="1"/>

<!-- Asynch receive reply -->
<ip:tcp-inbound-channel-adapter id="inAdapter.client"
    channel="toAggregator.client"
    connection-factory="client" /> <!-- Collaborator -->

<!-- dataType attribute invokes the conversion service, if necessary -->
<channel id="toAggregator.client" datatype="java.lang.String" />

<aggregator input-channel="toAggregator.client"
    output-channel="toTransformer.client"
    expire-groups-upon-completion="true"
    expire-groups-upon-timeout="true"
    discard-channel="noResponseChannel"
    group-timeout="1000"
    correlation-strategy-expression="payload.substring(0,3)"
    release-strategy-expression="size() == 2" />

<channel id="noResponseChannel" />

<service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />

<transformer input-channel="toTransformer.client"
    expression="payload.get(1)"/> <!-- The response is always second -->

(This simple sample correlates on the first 3 bytes).

Upvotes: 1

Related Questions