Reputation: 570
My TCP Server based on spring integration is working great.
It also handles reply-timeout at the server if the service takes long. When the service takes longer than reply-timeout set in it will send the message to error channel and that in turn sends back error message to the client.
Here is my code:
<int:transformer id="errorHandler"
input-channel="errorChannel"
ref="myTransformer" method="transform" />
<bean id="myTransformer" class="com.sample.MyTransformer" />
public class MyTransformer {
private static Logger logger = Logger.getLogger(MyTransformer.class);
public String transform(org.springframework.integration.handler.ReplyRequiredException e) {
logger.error("timeout exception is thrown");
return "Error in processing request.";
}
}
The above code works and the client gets 'Error in processing request' and the server log has the entry 'timeout exception is thrown'. But I also see the following exception in the log:
2016-06-30 16:25:27,827 ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'org.springframework.integration.config.ServiceActivatorFactoryBean#0', and its 'requiresReply' property is set to true.
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:422)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Implementation of MyTransformer does not seem to be correct.
Can you please help on how to customize transformer? How to get the payload in the transform method so that I can reply to client as 'Error in processing request. payload = ' + payload?
Thanks
UPDATE: To avoid logging ReplyRequiredException, I changed the error-channel on tcp-inbound-gateway as follows:
<int-ip:tcp-inbound-gateway id="gatewayCrLf"
connection-factory="crLfServer"
request-channel="requestChannel"
error-channel="tcpErrorChannel"
reply-timeout="10000"
/>
<int:channel id="tcpErrorChannel" />
<int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>
<int:gateway id="gateway" default-request-channel="timeoutChannel" default-reply-timeout="10000" />
<int:object-to-string-transformer id="serverBytes2String"
input-channel="timeoutChannel"
output-channel="serviceChannel"/>
<int:channel id="timeoutChannel">
<int:dispatcher task-executor="timeoutExecutor"/>
</int:channel>
<bean id="timeoutExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="50" />
</bean>
But I am getting MessageDeliveryException. tcpErrorChannel setup seems to be wrong. Can you please suggest? Here is the Stacktrace:
2016-07-05 12:17:34,266 ERROR [org.springframework.integration.ip.tcp.connection.TcpNetConnection] Exception sending message: GenericMessage [payload=byte[67], headers={timestamp=1467735444239, id=30eb099e-955d-1bd3-1789-49aa9fc84b6f, ip_tcp_remotePort=64055, ip_address=127.0.0.1, ip_localInetAddress=/127.0.0.1, ip_hostname=127.0.0.1, ip_connectionId=127.0.0.1:64055:5678:908d39a1-d027-4753-b144-59b9c0390fd7}]
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:452)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:390)
at org.springframework.integration.ip.tcp.TcpInboundGateway.doOnMessage(TcpInboundGateway.java:119)
at org.springframework.integration.ip.tcp.TcpInboundGateway.onMessage(TcpInboundGateway.java:97)
at org.springframework.integration.ip.tcp.connection.TcpNetConnection.run(TcpNetConnection.java:182)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.context.support.FileSystemXmlApplicationContext@4876db09.tcpErrorChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150)
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97)
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:449)
... 7 more
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:153)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
... 14 more
The above exception will go away by changing errorChennel to tcpErrorChannel in :
Thanks a lot to Artem for his perfect solution.
Upvotes: 2
Views: 2001
Reputation: 121177
Looks like it is continue of your other question reply-timeout meaning in tcp-inbound-gateway in spring integration.
Well, ReplyRequiredException
is as result of the <int:service-activator input-channel="requestChannel" ref="gateway" requires-reply="true"/>
.
requires-reply="true"
- please, read its description. You can play with that though and make it as false
(default one), but then your logic for "timeout" won't work. So, you don't have choice unless continue to live with that:
ERROR [org.springframework.integration.handler.LoggingHandler] org.springframework.integration.handler.ReplyRequiredException
Although, that is just affect of one of subscribers for the default errorChannel
: http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler.
Now regarding your last question. All the exception are wrapped into ErrorMessage
and sent into that errorChannel
. Typically exceptions are wrapped into MessagingException
, e.g. in this our case it is like:
else if (this.requiresReply && !isAsync()) {
throw new ReplyRequiredException(message, "No reply produced by handler '" +
getComponentName() + "', and its 'requiresReply' property is set to true.");
}
Pay attention into that message
ctor argument it is exactly a requestMessage
which is without reply in our scenario. And you can extract a desired payload
from it.
So, now you can get failedMessage
from the ReplyRequiredException
in your transform()
and extract its payload
for your goal.
UPDATE
Since this
ReplyRequiredException
is a known exception, there is no need to see it as Error.
Well, even it is a known exception, but it is an important one in other components and scenarios when it may not be appropriate do not receive a reply. The ReplyRequiredException
is very common through the Framework components, so moving it to different category could be a big architectural error for your application.
To avoid those logs you just should not use a default errorChannel
, but some other one from your TCP Inbound Gateway. Something like error-channel="tcpErrorChannel"
and that's all. A default errorChannel
will still have that LoggingHandler
as a subscriber, but you already won't send error messages to that channel.
Upvotes: 1