Reputation: 11
I'm trying to create a TCP client with org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory. When Factory createing TCP connection I'll need send some data to server, like as authorization. Server sent some data in response(e.g. salt). In next request will need to sent salt to server. The salt must be different for all connections in Pool and I think Connection must store salt inself.
So, my implementation is...
Spring Beans XML:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
interceptor-factory-chain="customInterceptorFactory"
mapper="mapper" />
<bean id="customInterceptorFactory" class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<bean class="ru.example.gateway.StatefulTcpConnectionFactory" />
</property>
</bean>
<bean id="cachedClient" class="org.springframework.integration.ip.tcp.connection.CachingClientConnectionFactory">
<constructor-arg ref="client" />
<constructor-arg value="5" />
</bean>
<int:channel id="clientRequestChannel"/>
<int-ip:tcp-outbound-gateway id="clientCrLf"
connection-factory="cachedClient"
request-channel="clientRequestChannel"/>
<int:converter ref="byteArrayToStringConverter" />
<bean id="byteArrayToStringConverter" class="ru.example.gateway.ByteArrayToStringConverter">
<property name="charset" value="windows-1251" />
</bean>
<bean id="mapper" class="org.springframework.integration.ip.tcp.connection.TcpMessageMapper">
<property name="charset" value="windows-1251" />
</bean>
TCP connection interceptor:
package ru.example.gateway;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.serializer.Serializer;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorSupport;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Getter
public class StatefulTcpConnection extends TcpConnectionInterceptorSupport {
private volatile String nextSalt = "";
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
public StatefulTcpConnection() {
}
public StatefulTcpConnection(ApplicationEventPublisher applicationEventPublisher) {
super(applicationEventPublisher);
}
@Override
public void send(Message<?> message) throws Exception {
if (!initialized.get()) {
throw new Exception("Connection not initialized");
}
Message<?> newMessage = message;
Object payload = message.getPayload();
log.debug("Send Payload({})", payload.getClass());
if (payload instanceof String) {
new GenericMessage<String>((String) payload + ";Salt=" + nextSalt + ";", message.getHeaders());
}
super.send(newMessage);
}
/**
* Invoke initialize after connection wrapped in ConnectionFactory4Interceptors
* @see org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory#initializeConnection(TcpConnectionSupport, Socket)
* @param serializer
*/
@Override
public void setSerializer(Serializer<?> serializer) {
super.setSerializer(serializer);
try {
initialize();
} catch (Exception e) {
throw new RuntimeException("Connection couldn't initialize", e);
}
}
private void initialize() throws Exception {
GenericMessage<String> loginMessage = new GenericMessage<>("Login");
super.send(loginMessage);
log.debug("Waiting initializing response");
latch.await(10, TimeUnit.SECONDS);
}
@Override
public boolean onMessage(Message<?> message) {
Object messagePayload = message.getPayload();
log.debug("onMessage Payload({})", messagePayload.getClass());
messagePayload = new String((byte[]) messagePayload);
if (messagePayload instanceof String) {
String payload = (String) messagePayload,
salt;
int saltStart = payload.indexOf(";Salt=");
if (saltStart >= 0) {
int saltEnd = payload.indexOf(";", saltStart + 1);
if (saltEnd >= 0) {
salt = payload.substring(saltStart + 6, saltEnd);
} else {
salt = payload.substring(saltStart + 6);
}
nextSalt = salt;
}
if (!initialized.get()) {
initialized.set(true);
latch.countDown();
log.debug("Initializing complete.");
}
}
return super.onMessage(message);
}
@Override
public String getConnectionId() {
return "Stateful:" + super.getConnectionId();
}
@Override
public String toString() {
return getConnectionId();
}
}
Gateway:
package ru.example.gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway(name = "clientOutGateway",
defaultRequestChannel = "clientRequestChannel"
)
public interface StringGateway {
String send(String message);
}
And service for Spring MVC:
package ru.example.service.impl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import ru.example.gateway.StringGateway;
import ru.example.service.MessageService;
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private StringGateway clientGateway;
public String getMessage(String code) {
return clientGateway.send(code);
}
}
I have some errors. In first request from MessageServiceImpl I gained response for "authentication request" and needed response lost, or throw Exception, or error message in log:
ERROR org.springframework.integration.ip.tcp.TcpOutboundGateway - Cannot correlate response - no pending reply for Cached:Stateful:localhost:12345:56060:6993fc83-7e69-4f18-9300-8553e6d74a4f
Are someone have solution for Stateful Connection?
Thanks! p.s.: sorry for my English
Upvotes: 1
Views: 1607
Reputation: 174494
There is currently no stateful connection management provided by the framework. You could write a wrapper for the connection factory that stores the connection in a ThreadLocal
but you would need some mechanism to clean up (and release it) when you are done with the connection.
If it's just a simple handshake you might be able to use connection interceptors; there's a test case (see HelloWorldInterceptor) but it's a bit complicated.
I would think a custom connection factory wrapper would be simpler.
Upvotes: 1