Reputation: 1234
I'm writing a RabbitMQ interface using spring integration for a TCP legacy service. I'm able to connect to the legacy service and send a XML string message.
The legacy server replies back with a XML message. Upon receiving the message, a custom deserializer reads most of the message until the very last line in the response and then fails with a MessageTimeoutException. I have set break points to verify this is happening. Is there anything obvious that I am doing wrong?
JUnit Test
@RunWith(SpringRunner.class)
@SpringBootTest
public class MaasIntegrationHelloWorld {
@Autowired
private TCPGateway gateway;
@Test
public void getAnalysisEngineInfo() throws JAXBException, ConnectException {
//Actual XML message creation and serialization removed for brevity
String message = "Hello World";
String result = gateway.send(message);
assertEquals(" ", result);
}
}
TCPConfig
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
@Configuration
public class TCPConfig {
@Value("${legacy.hostname}")
private String hostname;
@Value("${legacy.port}")
private int port;
@Bean(name="replyChannel")
public MessageChannel replyChannel() {
return new DirectChannel();
}
@Bean(name="sendChannel")
public MessageChannel sendChannel() {
return new DirectChannel();
}
@Bean
public TcpNetClientConnectionFactory connectionFactory() {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory(hostname, port);
connectionFactory.setDeserializer(new CommandResultDeserializer());
connectionFactory.setSingleUse(false);
return connectionFactory;
}
@Bean
@ServiceActivator(inputChannel = "sendChannel")
public TcpOutboundGateway tcpOutboundGateway() {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(connectionFactory());
tcpOutboundGateway.setReplyChannel(this.replyChannel());
tcpOutboundGateway.setRequiresReply(true);
return tcpOutboundGateway;
}
}
Gateway interface
@MessagingGateway(defaultRequestChannel = "sendChannel")
public interface TCPGateway {
String send(String message) throws ConnectException;
}
Gateway Listener
@Configuration
@EnableIntegration
@IntegrationComponentScan
@MessageEndpoint
public class TCPGatewayListener {
@ServiceActivator(inputChannel = "replyChannel")
public String replyHandler(byte[] b) {
return new String(b);
}
}
Deserializer
public String deserialize(InputStream inputStream) throws IOException {
BufferedReader br = null;
StringBuilder sb = new StringBuilder();
String line;
br = new BufferedReader(new InputStreamReader(inputStream));
while ((line = br.readLine()) != null) {
sb.append(line);
}
return sb.toString();
}
Stack trace
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:146) ~[spring-integration-ip-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:79) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:70) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:449) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:422) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:478) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:433) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:424) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) [spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy107.send(Unknown Source) [na:na]
at orgl.shelly.maas.MaasIntegrationHelloWorld.getAnalysisEngineInfo(MaasIntegrationHelloWorld.java:49) [test-classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12]
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12]
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12]
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12]
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) [junit-rt.jar:na]
Upvotes: 0
Views: 699
Reputation: 174494
while ((line = br.readLine()) != null) {
sb.append(line);
}
That's an odd deserializer - how do you know when the response message is complete?
If the server replies with a multi-line response, you need to know when the response is complete somehow; perhaps the last line being zero length, for example.
Or
if (line.equals("</endofdoc>") {
break;
}
Upvotes: 1