Reputation: 27
As @Gary Russell mentioned in this link, i extend TcpOutboundGateway to receive message from TCP server with out any request.
Here my custom TcpOutboundGateway, if message payload contains "freqID" then it sends message to MessageChannel
public class ExtendedTcpOutboundGateway extends TcpOutboundGateway {
private final DirectChannel unsolicitedMessageChannel;
public ExtendedTcpOutboundGateway(DirectChannel unsolicitedMessageChannel) {
this.unsolicitedMessageChannel = unsolicitedMessageChannel;
}
@Override
public boolean onMessage(Message<?> message) {
if (isUnsolicitedMessage((Message<byte[]>) message)) {
this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
return false;
} else {
return super.onMessage(message);
}
}
private boolean isUnsolicitedMessage(Message<byte[]> message) {
byte[] payloadByte = message.getPayload();
String payloadString = new String(payloadByte);
System.out.println(payloadString);
return payloadString.contains("freqID");
}
}
Here dynamic tcp routing flow code below, as you see i have added parameter name "unsolicitedMessageChannelName" for to create direct channel with Id, then give that DirectChannel into constructor of ExtendedTcpOutboundGateway to deal with data from tcp server sends without any request
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
String unsolicitedMessageChannelName= (String) message.getHeaders().get("unsolicitedMessageChannelName");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer = new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
ExtendedTcpOutboundGateway tcpOutboundGateway = new ExtendedTcpOutboundGateway(directChannel);
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
Here is the code for ServiceActivator and tcp client;
@Service
public class PeriodicalData implements PeriodicalDataService {
public void setPeriodicalDataOrder(some parameters) {
String unsolicitedMessageChannelName="unsolicitedMessageChannelName_Test";
byte[] result = tcpClientGateway.send(data, ip, port ,unsolicitedMessageChannelName);
String response = new String(result);
System.out.println("Here response for request data : " + response +" received");
}
@ServiceActivator(inputChannel = "unsolicitedMessageChannelName_Test")
public void handle(String in) {
System.out.println("Here unsolicitedMessageChannel data : " + in+" received");
}
}
When i use such an combination i am taking an exception like
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'unsolicitedMessageChannelName_Test'.
I could not understand why i am taking this exception because not same but similar usage is working fine from this link.
I think i have problem with my desing or usage way of ServiceActivator, what should i do to clear exception ? Any Idea that you can suggest ?
Upvotes: 0
Views: 450
Reputation: 121550
You do like this in your createNewSubflow()
:
DirectChannel directChannel = MessageChannels.direct(unsolicitedMessageChannelName).get();
and you don't register it as a bean in the application context. Therefore this object is not tied with the mentioned @ServiceActivator
and it indeed doesn't have any subscribers at runtime. You even don't need to create that object. What you need is to take an existing
bean for that channel from the application context if you really want to deliver a message into that @ServiceActivator
.
Consider to inject a BeanFactory
into your component with the createNewSubflow()
to call its getBean(unsolicitedMessageChannelName, DirectChannel.class)
to get access to real bean with an appropriate service activator subscriber.
Upvotes: 1