Reputation: 261
I'm trying to implement the following scenario using Spring Integration:
I need a client to connect to a server via TCP IP and wait to receive messages within 30 seconds. I need a server to send 0 to n messages to the client which had connected. I need a way to start and stop channel transfer without loss of messages. I need to change the port the server is listening between stop and start.
This is my config so far:
@Configuration
public class TcpConfiguration {
private static Logger LOG = LoggerFactory.getLogger(TcpConfiguration.class);
@Value("${port}")
private Integer port;
@Value("${so-timeout}")
private Integer soTimeout;
@Value("${keep-alive}")
private Boolean keepAlive;
@Value("${send-timeout}")
private Integer sendTimeout;
@Bean
public AbstractServerConnectionFactory getMyConnFactory() {
LOG.debug("getMyConnFactory");
TcpNetServerConnectionFactory factory = new TcpNetServerConnectionFactory(port);
LOG.debug("getMyConnFactory port={}", port);
factory.setSoTimeout(soTimeout);
LOG.debug("getMyConnFactory soTimeout={}", soTimeout);
factory.setSoKeepAlive(true);
LOG.debug("getMyConnFactory keepAlive={}", keepAlive);
return factory;
}
@Bean
public AbstractEndpoint getMyChannelAdapter() {
LOG.debug("getMyChannelAdapter");
TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
adapter.setConnectionFactory(getMyConnFactory());
adapter.setOutputChannel(myChannelIn());
adapter.setSendTimeout(sendTimeout);
LOG.debug("getMyChannelAdapter adapter={}", adapter.getClass().getName());
return adapter;
}
@Bean
public MessageChannel myChannelIn() {
LOG.debug("myChannelIn");
return new DirectChannel();
}
@Bean
@Transformer(inputChannel = "myChannelIn", outputChannel = "myServiceChannel")
public ObjectToStringTransformer myTransformer() {
LOG.debug("myTransformer");
return new ObjectToStringTransformer();
}
@ServiceActivator(inputChannel = "myServiceChannel")
public void service(String in) {
LOG.debug("service received={}", in);
}
@Bean
public MessageChannel myChannelOut() {
LOG.debug("myChannelOut");
return new DirectChannel();
}
@Bean
public IntegrationFlow myOutbound() {
LOG.debug("myOutbound");
return IntegrationFlows.from(myChannelOut())
.handle(mySender())
.get();
}
@Bean
public MessageHandler mySender() {
LOG.debug("mySender");
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(getMyConnFactory());
return tcpSendingMessageHandler;
}
}
Please advice!
To change the server port I would shutdown the application context and restart it after configuring the new port in a remote configuration server. Can I just close the application context without corrupting the current message transfer? I don't know how to handle the connect-only client thing.
Upvotes: 0
Views: 631
Reputation: 174494
Use dynamic flow registration; just get the connection to open it without sending.
@SpringBootApplication
public class So62867670Application {
public static void main(String[] args) {
SpringApplication.run(So62867670Application.class, args);
}
@Bean
public ApplicationRunner runner(DynamicTcpReceiver receiver) {
return args -> { // Just a demo to show starting/stopping
receiver.connectAndListen(1234);
System.in.read();
receiver.stop();
System.in.read();
receiver.connectAndListen(1235);
System.in.read();
receiver.stop();
};
}
}
@Component
class DynamicTcpReceiver {
@Autowired
private IntegrationFlowContext context;
private IntegrationFlowRegistration registration;
public void connectAndListen(int port) throws InterruptedException {
TcpClientConnectionFactorySpec client = Tcp.netClient("localhost", port)
.deserializer(TcpCodecs.lf());
IntegrationFlow flow = IntegrationFlows.from(Tcp.inboundAdapter(client))
.transform(Transformers.objectToString())
.handle(System.out::println)
.get();
this.registration = context.registration(flow).register();
client.get().getConnection(); // just open the single shared connection
}
public void stop() {
if (this.registration != null) {
this.registration.destroy();
this.registration = null;
}
}
}
EDIT
And this is the server side...
@SpringBootApplication
@EnableScheduling
public class So62867670ServerApplication {
public static void main(String[] args) {
SpringApplication.run(So62867670ServerApplication.class, args);
}
@Bean
public ApplicationRunner runner(DynamicTcpServer receiver) {
return args -> { // Just a demo to show starting/stopping
receiver.tcpListen(1234);
System.in.read();
receiver.stop(1234);
System.in.read();
receiver.tcpListen(1235);
System.in.read();
receiver.stop(1235);
};
}
}
@Component
class DynamicTcpServer {
private static final Logger LOG = LoggerFactory.getLogger(DynamicTcpServer.class);
@Autowired
private IntegrationFlowContext flowContext;
@Autowired
private ApplicationContext appContext;
private final Map<Integer, IntegrationFlowRegistration> registrations = new HashMap<>();
private final Map<String, Entry<Integer, AtomicInteger>> clients = new ConcurrentHashMap<>();
public void tcpListen(int port) {
TcpServerConnectionFactorySpec server = Tcp.netServer(port)
.id("server-" + port)
.serializer(TcpCodecs.lf());
server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
this.registrations.put(port, flowContext.registration(flow).register());
}
public void stop(int port) {
IntegrationFlowRegistration registration = this.registrations.remove(port);
if (registration != null) {
registration.destroy();
}
}
@EventListener
public void closed(TcpConnectionOpenEvent event) {
LOG.info(event.toString());
String connectionId = event.getConnectionId();
String[] split = connectionId.split(":");
int port = Integer.parseInt(split[2]);
this.clients.put(connectionId, new AbstractMap.SimpleEntry<>(port, new AtomicInteger()));
}
@EventListener
public void closed(TcpConnectionCloseEvent event) {
LOG.info(event.toString());
this.clients.remove(event.getConnectionId());
}
@EventListener
public void listening(TcpConnectionServerListeningEvent event) {
LOG.info(event.toString());
}
@Scheduled(fixedDelay = 5000)
public void sender() {
this.clients.forEach((connectionId, portAndCount) -> {
IntegrationFlowRegistration registration = this.registrations.get(portAndCount.getKey());
if (registration != null) {
LOG.info("Sending to " + connectionId);
registration.getMessagingTemplate().send(MessageBuilder.withPayload("foo")
.setHeader(IpHeaders.CONNECTION_ID, connectionId).build());
if (portAndCount.getValue().incrementAndGet() > 9) {
this.appContext.getBean("server-" + portAndCount.getKey(), TcpNetServerConnectionFactory.class)
.closeConnection(connectionId);
}
}
});
}
}
Upvotes: 1