Andrey
Andrey

Reputation: 11

Spring Integration Stateful TCP Connection

I'm trying to create a TCP client with org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory. When Factory createing TCP connection I'll need send some data to server, like as authorization. Server sent some data in response(e.g. salt). In next request will need to sent salt to server. The salt must be different for all connections in Pool and I think Connection must store salt inself.

So, my implementation is...
Spring Beans XML:

<int-ip:tcp-connection-factory id="client"
                                   type="client"
                                   host="localhost"
                                   port="12345"
                                   interceptor-factory-chain="customInterceptorFactory"
                                   mapper="mapper" />

    <bean id="customInterceptorFactory" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
        <property name="interceptors">
            <bean class="ru.example.gateway.StatefulTcpConnectionFactory" />
        </property>
    </bean>
    <bean id="cachedClient" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory">
        <constructor-arg ref="client" />
        <constructor-arg value="5" />
    </bean>

    <int:channel id="clientRequestChannel"/>

    <int-ip:tcp-outbound-gateway id="clientCrLf"
                                connection-factory="cachedClient"
                                request-channel="clientRequestChannel"/>

    <int:converter ref="byteArrayToStringConverter" />
    <bean id="byteArrayToStringConverter" class="ru.example.gateway.ByteArrayToStringConverter">
        <property name="charset" value="windows-1251" />
    </bean>
    <bean id="mapper" class="org.springframework.integration.ip.tcp.connection.TcpMessageMapper">
        <property name="charset" value="windows-1251" />
    </bean>

TCP connection interceptor:

package ru.example.gateway;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Getter
public class StatefulTcpConnection extends TcpConnectionInterceptorSupport {

    private volatile String nextSalt = "";
    private final AtomicBoolean initialized = new AtomicBoolean(false);
    private final CountDownLatch latch = new CountDownLatch(1);

    public StatefulTcpConnection() {
    }

    public StatefulTcpConnection(ApplicationEventPublisher applicationEventPublisher) {
        super(applicationEventPublisher);
    }

    @Override
    public void send(Message<?> message) throws Exception {
        if (!initialized.get()) {
            throw new Exception("Connection not initialized");
        }
        Message<?> newMessage = message;
        Object payload = message.getPayload();
        log.debug("Send Payload({})", payload.getClass());
        if (payload instanceof String) {
            new GenericMessage<String>((String) payload + ";Salt=" + nextSalt + ";", message.getHeaders());
        }
        super.send(newMessage);
    }

    /**
     * Invoke initialize after connection wrapped in ConnectionFactory4Interceptors
     * @see org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory#initializeConnection(TcpConnectionSupport, Socket)
     * @param serializer
     */
    @Override
    public void setSerializer(Serializer<?> serializer) {
        super.setSerializer(serializer);
        try {
            initialize();
        } catch (Exception e) {
            throw new RuntimeException("Connection couldn't initialize", e);
        }
    }

    private void initialize() throws Exception {
        GenericMessage<String> loginMessage = new GenericMessage<>("Login");
        super.send(loginMessage);
        log.debug("Waiting initializing response");
        latch.await(10, TimeUnit.SECONDS);
    }

    @Override
    public boolean onMessage(Message<?> message) {
        Object messagePayload = message.getPayload();
        log.debug("onMessage Payload({})", messagePayload.getClass());
        messagePayload = new String((byte[]) messagePayload);
        if (messagePayload instanceof String) {
            String payload = (String) messagePayload,
                salt;
            int saltStart = payload.indexOf(";Salt=");
            if (saltStart >= 0) {
                int saltEnd = payload.indexOf(";", saltStart + 1);
                if (saltEnd >= 0) {
                    salt = payload.substring(saltStart + 6, saltEnd);
                } else {
                    salt = payload.substring(saltStart + 6);
                }
                nextSalt = salt;
            }
            if (!initialized.get()) {
                initialized.set(true);
                latch.countDown();
                log.debug("Initializing complete.");
            }
        }
        return super.onMessage(message);
    }

    @Override
    public String getConnectionId() {
        return "Stateful:" + super.getConnectionId();
    }

    @Override
    public String toString() {
        return getConnectionId();
    }

}

Gateway:

package ru.example.gateway;

import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway(name = "clientOutGateway",
    defaultRequestChannel = "clientRequestChannel"
)
public interface StringGateway {

    String send(String message);

}

And service for Spring MVC:

package ru.example.service.impl;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.example.gateway.StringGateway;
import ru.example.service.MessageService;

@Service
public class MessageServiceImpl implements MessageService {

    @Autowired
    private StringGateway clientGateway;

    public String getMessage(String code) {
        return clientGateway.send(code);
    }

}

I have some errors. In first request from MessageServiceImpl I gained response for "authentication request" and needed response lost, or throw Exception, or error message in log:

ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply for Cached:Stateful:localhost:12345:56060:6993fc83-7e69-4f18-9300-8553e6d74a4f

Are someone have solution for Stateful Connection?

Thanks! p.s.: sorry for my English

Upvotes: 1

Views: 1607

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

There is currently no stateful connection management provided by the framework. You could write a wrapper for the connection factory that stores the connection in a ThreadLocal but you would need some mechanism to clean up (and release it) when you are done with the connection.

If it's just a simple handshake you might be able to use connection interceptors; there's a test case (see HelloWorldInterceptor) but it's a bit complicated.

I would think a custom connection factory wrapper would be simpler.

Upvotes: 1

Related Questions