Reputation: 13
Thanks in advance! I am working on a simple TCP integration where the goal is to have many clients connected to the server and to facilitate asynchronous messages between the client and server. I have pieced together some code from examples and everything is working as expected except one thing:
When one connection stalls (not disconnects, but does something like drop all packets) all the other connections stop receiving messages until the one recovers. What should I implement or change in order to allow communication to clients be independent of connection woes with other clients?
Server config is as follows:
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.ExecutorChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.TcpSendingMessageHandler;
import org.springframework.integration.ip.tcp.connection.*;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@EnableIntegration
@Configuration
public class IntegrationConfig {
private static final Logger log = Logger.getLogger(IntegrationConfig.class);
@Value("${listen.port:8000}")
private int port;
@Autowired
Relayer relayer;
@Bean //for accepting text message from TCP, putty
public MessageChannel fromTcp() {
return new ExecutorChannel(threadPoolTaskExecutor());
}
@Bean //inbound, it is working, I could read the inbound message while debugging
public TcpReceivingChannelAdapter in(
AbstractServerConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setOutputChannel(fromTcp());
adapter.setConnectionFactory(connectionFactory);
return adapter;
}
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(100);
return taskScheduler;
}
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setCorePoolSize(100);
return threadPoolTaskExecutor;
}
@Bean
public MessageChannel toTcp() {
ExecutorChannel directChannel = new ExecutorChannel(threadPoolTaskExecutor());
relayer.setOutboundChannel(directChannel);
return directChannel;
}
@ServiceActivator(inputChannel = "toTcp")
@Bean
public TcpSendingMessageHandler out(
AbstractServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler tcpOutboundAdp = new TcpSendingMessageHandler();
tcpOutboundAdp.setConnectionFactory(connectionFactory);
return tcpOutboundAdp;
}
@ServiceActivator(inputChannel = "fromTcp")
public void handleIncompingMessage(Message<byte[]> stringMsg) {
String new_message = new String(stringMsg.getPayload());
new_message = new_message.replaceAll("\r", "");
new_message = new_message.replaceAll("\n", "");
relayer.processIncomingMessage((String) stringMsg.getHeaders().get("ip_connectionId"), new_message);
}
@Bean
public AbstractServerConnectionFactory serverCF() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(this.port);
tcpNetServerConnectionFactory.setSingleUse(false);
tcpNetServerConnectionFactory.setSerializer(new ByteArrayCrLfSerializer());
tcpNetServerConnectionFactory.setDeserializer(new ByteArrayLfSerializer());
return tcpNetServerConnectionFactory;
}
@EventListener
public void onApplicationEvent(TcpConnectionOpenEvent event) {
relayer.newConnection(event.getConnectionId());
}
@EventListener
public void onApplicationEvent(TcpConnectionCloseEvent event) {
relayer.deleteConnection(event.getConnectionId());
}
}
This is based on many examples, and again, it all works except when one client lags out. I have other parts of my app talking to the output channel, should I have an aggregator of some sort in the middle?
Cheers!
Upvotes: 1
Views: 831
Reputation: 174484
Something is not adding up; threadPoolTaskExecutor-9
is blocked trying to send data to the socket; the connection object is locked...
- locked <0x0000000746902810> (a org.springframework.integration.ip.tcp.connection.TcpNetConnection)
9 other threads are waiting to acquire that lock (send()
is synchronized
), so it appears you are attempting to write multiple messages to the same socket concurrently.
So it's not a problem with sending to other sockets, its a problem sending to that one socket (presumably because its buffers are full).
EDIT
Here's one way to perform all outbound sends on a single thread per socket...
@SpringBootApplication
public class So46917862Application {
private static final Logger LOGGER = LoggerFactory.getLogger(So46917862Application.class);
public static void main(String[] args) {
SpringApplication.run(So46917862Application.class, args).close();
}
@Bean
public ApplicationRunner runner() {
return args -> {
Socket socket = SocketFactory.getDefault().createSocket("localhost", 1234);
socket.getOutputStream().write("foo\r\n".getBytes());
socket.getOutputStream().write("bar\r\n".getBytes());
Thread.sleep(10_000);
socket.close();
};
}
@Bean
public TcpReceivingChannelAdapter adapter() {
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(server());
adapter.setOutputChannel(inbound());
return adapter;
}
@Bean
public MessageChannel inbound() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "inbound")
@Bean
public MessageHandler asyncResponder() {
SimpleAsyncTaskExecutor exec = new SimpleAsyncTaskExecutor();
return m ->
exec.execute(() -> {
LOGGER.info("Initiating on this thread");
toTcp().send(new GenericMessage<>("FOO", m.getHeaders()));
});
}
@Bean
public AbstractServerConnectionFactory server() {
return new TcpNetServerConnectionFactory(1234);
}
@ServiceActivator(inputChannel = "outbound")
@Bean
public TcpSendingMessageHandler handler() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(server());
return handler;
}
@Bean
public MessageChannel toTcp() {
return new DirectChannel();
}
@Bean
public MessageChannel outbound() {
return new DirectChannel();
}
@Bean
public SingleThreadPerConnection sender() {
return new SingleThreadPerConnection(outbound());
}
public static class SingleThreadPerConnection implements ApplicationListener<TcpConnectionCloseEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadPerConnection.class);
private final Map<String, ThreadPoolTaskExecutor> executors = new HashMap<>();
private final MessagingTemplate messagingTemplate;
public SingleThreadPerConnection(MessageChannel channel) {
this.messagingTemplate = new MessagingTemplate(channel);
}
@Override
public synchronized void onApplicationEvent(TcpConnectionCloseEvent event) {
this.executors.remove(event.getConnectionId()).shutdown();
LOGGER.info("Removed executor for " + event.getConnectionId());
}
@ServiceActivator(inputChannel = "toTcp")
public void sendToThread(final Message<?> message) {
executorFor((String) message.getHeaders().get(IpHeaders.CONNECTION_ID))
.execute(() -> {
LOGGER.info("Sending on this thread");
this.messagingTemplate.send(message);
});
}
private synchronized TaskExecutor executorFor(String connectionId) {
Assert.state(connectionId != null, "No connection id header present");
if (this.executors.get(connectionId) == null) {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setThreadNamePrefix(connectionId + "-exec-");
exec.initialize();
this.executors.put(connectionId, exec);
}
return this.executors.get(connectionId);
}
}
}
Result:
2017-10-25 09:37:16.250 INFO 54983 --- [cTaskExecutor-1] com.example.So46917862Application
: Initiating on this thread
2017-10-25 09:37:16.250 INFO 54983 --- [cTaskExecutor-2] com.example.So46917862Application
: Initiating on this thread
2017-10-25 09:37:16.253 INFO 54983 --- [20b926b7-exec-1] 862Application$SingleThreadPerConnection
: Sending on this thread
2017-10-25 09:37:16.253 INFO 54983 --- [20b926b7-exec-1] 862Application$SingleThreadPerConnection
: Sending on this thread
Upvotes: 1