Reputation: 389
I have the following annotation-based configuration in a Spring Integration app:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(@Value("${poll-interval}") int pollInterval) {
return Pollers.fixedRate(pollInterval).maxMessagesPerPoll(1).get();
}
@Bean
@InboundChannelAdapter(value = "httpRequestChannel", poller = @Poller(PollerMetadata.DEFAULT_POLLER),
autoStartup = "${auto-start:true}")
public MessageSource<String> httpRequestTrigger() {
Expression expression = new SpelExpressionParser().parseExpression(
"#{@timeService.getLastQueryTime()}",
new TemplateParserContext("#{", "}"));
return new ExpressionEvaluatingMessageSource<>(expression, String.class);
}
@Bean
@Qualifier("httpRequestChannel")
public MessageChannel httpRequestChannel() {
return new RendezvousChannel();
}
@Bean
@ServiceActivator(inputChannel = "httpRequestChannel", poller = @Poller(fixedRate="100"))
public MessageHandler httpRequestExecutingMessageHandler(
URI backendURI,
@Qualifier("httpReplyChannel") MessageChannel httpReplyChannel,
RestTemplate restTemplate,
@Qualifier("filterExpression") Expression filterExpression
) {
HttpRequestExecutingMessageHandler messageHandler = new HttpRequestExecutingMessageHandler(
backendURI.toString() + "?filter={filter}", restTemplate);
messageHandler.setHttpMethod(HttpMethod.GET);
messageHandler.setExpectedResponseType(String.class);
messageHandler.setOutputChannel(httpReplyChannel);
messageHandler.setExpectReply(true);
messageHandler.setSendTimeout(1000);
Map<String, Expression> uriVariableExpressions = new HashMap<>();
uriVariableExpressions.put("filter", filterExpression);
messageHandler.setUriVariableExpressions(uriVariableExpressions);
return messageHandler;
}
@Bean
@Qualifier("httpReplyChannel")
public MessageChannel httpReplyChannel() {
return new DirectChannel();
}
In my application.properties
file, auto-start
is set to true
and poll-interval
is set to 2000.
Sometimes when starting up the application, the poller will work as expected, firing every poll-interval
milliseconds. However, just as often, when starting up, the poller will fire once and never fire again. Is there an error in my configuration? How do I get the poller to consistently work?
EDIT: I took a thread dump when the poller hanged:
"task-scheduler-1@4820" prio=5 tid=0x13 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877)
at org.springframework.integration.channel.QueueChannel.doSend(QueueChannel.java:93)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"http-nio-8081-ClientPoller-1@5021" daemon prio=5 tid=0x18 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x13b0> (a sun.nio.ch.EPollSelectorImpl)
- locked <0x13b1> (a java.util.Collections$UnmodifiableSet)
- locked <0x13b2> (a sun.nio.ch.Util$3)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
at java.lang.Thread.run(Thread.java:745)
"http-nio-8081-ClientPoller-0@5017" daemon prio=5 tid=0x17 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x13b3> (a sun.nio.ch.EPollSelectorImpl)
- locked <0x13b4> (a java.util.Collections$UnmodifiableSet)
- locked <0x13b5> (a sun.nio.ch.Util$3)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
at java.lang.Thread.run(Thread.java:745)
"NioBlockingSelector.BlockPoller-1@5010" daemon prio=5 tid=0x16 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x13b6> (a sun.nio.ch.EPollSelectorImpl)
- locked <0x13b7> (a java.util.Collections$UnmodifiableSet)
- locked <0x13b8> (a sun.nio.ch.Util$3)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.tomcat.util.net.NioBlockingSelector$BlockPoller.run(NioBlockingSelector.java:339)
"task-scheduler-3@4847" prio=5 tid=0x15 nid=NA waiting
java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Unsafe.java:-1)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"http-nio-8081-Acceptor-0@5022" daemon prio=5 tid=0x19 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.ServerSocketChannelImpl.accept0(ServerSocketChannelImpl.java:-1)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
- locked <0x13af> (a java.lang.Object)
at org.apache.tomcat.util.net.NioEndpoint$Acceptor.run(NioEndpoint.java:456)
at java.lang.Thread.run(Thread.java:745)
"task-scheduler-2@4837" prio=5 tid=0x14 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
- locked <0x13b9> (a java.io.BufferedInputStream)
at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1569)
- locked <0x13ba> (a sun.net.www.protocol.http.HttpURLConnection)
at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
at org.springframework.http.client.SimpleBufferingClientHttpRequest.executeInternal(SimpleBufferingClientHttpRequest.java:84)
at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
at org.howdoevenexist.AuthInterceptor.intercept(AuthInterceptor.java:24)
at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:85)
at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:69)
at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:619)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:595)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:516)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.handleRequestMessage(HttpRequestExecutingMessageHandler.java:382)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"http-nio-8081-AsyncTimeout@5025" daemon prio=5 tid=0x1a nid=NA sleeping
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Thread.java:-1)
at org.apache.coyote.AbstractProtocol$AsyncTimeout.run(AbstractProtocol.java:1137)
at java.lang.Thread.run(Thread.java:745)
"container-0@4216" prio=5 tid=0x12 nid=NA sleeping
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Thread.java:-1)
at org.apache.catalina.core.StandardServer.await(StandardServer.java:427)
at org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer$1.run(TomcatEmbeddedServletContainer.java:166)
"ContainerBackgroundProcessor[StandardEngine[Tomcat]]@4204" daemon prio=5 tid=0x11 nid=NA sleeping
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Thread.java:-1)
at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.run(ContainerBase.java:1339)
at java.lang.Thread.run(Thread.java:745)
"Finalizer@5037" daemon prio=8 tid=0x3 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler@5038" daemon prio=10 tid=0x2 nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"DestroyJavaVM@5035" prio=5 tid=0x1c nid=NA runnable
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher@5036" daemon prio=9 tid=0x4 nid=NA runnable
java.lang.Thread.State: RUNNABLE
Upvotes: 1
Views: 2229
Reputation: 389
Gary Russell helped me out in the comments so I thought I'd post what turned out to be the solution as an answer in case anyone has the same issue. The issue was not with the poller at all, but with the HttpRequestExecutingMessageHandler
. The stacktrace shows the task-scheduler-2
thread hanging in the following state:
"task-scheduler-2@4837" prio=5 tid=0x14 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
This indicated that the request to the backend was hanging. There was no timeout set on the RestTemplate
used by the message handler, so the thread would wait forever for a request that would never come, which caused the entire application to appear as though it had stopped. My solution was to add the following ClientHttpRequestFactory
to the RestTemplate
bean:
@Bean
public ClientHttpRequestFactory clientHttpRequestFactory() {
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(500);
factory.setConnectTimeout(500);
return factory;
}
Setting a nonempty read and connect timeout causes the application to log an exception when the backend times out, after which the application continues running as normal. Many thanks to Gary Russell for helping me figure out this perplexing issue.
Upvotes: 0
Reputation: 174554
See Configuring the Task Scheduler bean - it only has 10 threads by default.
With queue channels in the mix, the threads are suspended for the receiveTimeout
(1 second by default). If you have a lot of queue channels you can experience thread starvation.
As an aside, you generally don't need lots of queue channels; typically a single thread handoff is enough in a flow.
Upvotes: 1