Rowan Jacobs
Rowan Jacobs

Reputation: 389

Spring Integration poller sometimes runs once then stops

I have the following annotation-based configuration in a Spring Integration app:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller(@Value("${poll-interval}") int pollInterval) {
    return Pollers.fixedRate(pollInterval).maxMessagesPerPoll(1).get();
}

@Bean
@InboundChannelAdapter(value = "httpRequestChannel", poller = @Poller(PollerMetadata.DEFAULT_POLLER),
    autoStartup = "${auto-start:true}")
public MessageSource<String> httpRequestTrigger() {
    Expression expression = new SpelExpressionParser().parseExpression(
        "#{@timeService.getLastQueryTime()}",
        new TemplateParserContext("#{", "}"));
    return new ExpressionEvaluatingMessageSource<>(expression, String.class);
}

@Bean
@Qualifier("httpRequestChannel")
public MessageChannel httpRequestChannel() {
    return new RendezvousChannel();
}

@Bean
@ServiceActivator(inputChannel = "httpRequestChannel", poller = @Poller(fixedRate="100"))
public MessageHandler httpRequestExecutingMessageHandler(
    URI backendURI,
    @Qualifier("httpReplyChannel") MessageChannel httpReplyChannel,
    RestTemplate restTemplate,
    @Qualifier("filterExpression") Expression filterExpression
) {
    HttpRequestExecutingMessageHandler messageHandler = new HttpRequestExecutingMessageHandler(
        backendURI.toString() + "?filter={filter}", restTemplate);
    messageHandler.setHttpMethod(HttpMethod.GET);
    messageHandler.setExpectedResponseType(String.class);
    messageHandler.setOutputChannel(httpReplyChannel);
    messageHandler.setExpectReply(true);
    messageHandler.setSendTimeout(1000);
    Map<String, Expression> uriVariableExpressions = new HashMap<>();
    uriVariableExpressions.put("filter", filterExpression);
    messageHandler.setUriVariableExpressions(uriVariableExpressions);
    return messageHandler;
}

@Bean
@Qualifier("httpReplyChannel")
public MessageChannel httpReplyChannel() {
    return new DirectChannel();
}

In my application.properties file, auto-start is set to true and poll-interval is set to 2000.

Sometimes when starting up the application, the poller will work as expected, firing every poll-interval milliseconds. However, just as often, when starting up, the poller will fire once and never fire again. Is there an error in my configuration? How do I get the poller to consistently work?

EDIT: I took a thread dump when the poller hanged:

"task-scheduler-1@4820" prio=5 tid=0x13 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:458)
      at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
      at java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:877)
      at org.springframework.integration.channel.QueueChannel.doSend(QueueChannel.java:93)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
      at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
      at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
      at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
      at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
      at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
      at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
      at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-ClientPoller-1@5021" daemon prio=5 tid=0x18 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b0> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b1> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b2> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-ClientPoller-0@5017" daemon prio=5 tid=0x17 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b3> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b4> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b5> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioEndpoint$Poller.run(NioEndpoint.java:790)
      at java.lang.Thread.run(Thread.java:745)

"NioBlockingSelector.BlockPoller-1@5010" daemon prio=5 tid=0x16 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
      - locked <0x13b6> (a sun.nio.ch.EPollSelectorImpl)
      - locked <0x13b7> (a java.util.Collections$UnmodifiableSet)
      - locked <0x13b8> (a sun.nio.ch.Util$3)
      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
      at org.apache.tomcat.util.net.NioBlockingSelector$BlockPoller.run(NioBlockingSelector.java:339)

"task-scheduler-3@4847" prio=5 tid=0x15 nid=NA waiting
  java.lang.Thread.State: WAITING
      at sun.misc.Unsafe.park(Unsafe.java:-1)
      at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
      at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
      at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1088)
      at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
      at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-Acceptor-0@5022" daemon prio=5 tid=0x19 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at sun.nio.ch.ServerSocketChannelImpl.accept0(ServerSocketChannelImpl.java:-1)
      at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
      at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
      - locked <0x13af> (a java.lang.Object)
      at org.apache.tomcat.util.net.NioEndpoint$Acceptor.run(NioEndpoint.java:456)
      at java.lang.Thread.run(Thread.java:745)

"task-scheduler-2@4837" prio=5 tid=0x14 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)
      at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.net.SocketInputStream.read(SocketInputStream.java:170)
      at java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
      at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
      at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
      - locked <0x13b9> (a java.io.BufferedInputStream)
      at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
      at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
      at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1569)
      - locked <0x13ba> (a sun.net.www.protocol.http.HttpURLConnection)
      at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1474)
      at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
      at org.springframework.http.client.SimpleBufferingClientHttpRequest.executeInternal(SimpleBufferingClientHttpRequest.java:84)
      at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
      at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
      at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
      at org.howdoevenexist.AuthInterceptor.intercept(AuthInterceptor.java:24)
      at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:85)
      at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:69)
      at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
      at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
      at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:619)
      at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:595)
      at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:516)
      at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.handleRequestMessage(HttpRequestExecutingMessageHandler.java:382)
      at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
      at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
      at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
      at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
      at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
      at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
      at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
      at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

"http-nio-8081-AsyncTimeout@5025" daemon prio=5 tid=0x1a nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.coyote.AbstractProtocol$AsyncTimeout.run(AbstractProtocol.java:1137)
      at java.lang.Thread.run(Thread.java:745)

"container-0@4216" prio=5 tid=0x12 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.catalina.core.StandardServer.await(StandardServer.java:427)
      at org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainer$1.run(TomcatEmbeddedServletContainer.java:166)

"ContainerBackgroundProcessor[StandardEngine[Tomcat]]@4204" daemon prio=5 tid=0x11 nid=NA sleeping
  java.lang.Thread.State: TIMED_WAITING
      at java.lang.Thread.sleep(Thread.java:-1)
      at org.apache.catalina.core.ContainerBase$ContainerBackgroundProcessor.run(ContainerBase.java:1339)
      at java.lang.Thread.run(Thread.java:745)

"Finalizer@5037" daemon prio=8 tid=0x3 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
      at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
      at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

"Reference Handler@5038" daemon prio=10 tid=0x2 nid=NA waiting
  java.lang.Thread.State: WAITING
      at java.lang.Object.wait(Object.java:-1)
      at java.lang.Object.wait(Object.java:502)
      at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
      at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

"DestroyJavaVM@5035" prio=5 tid=0x1c nid=NA runnable
  java.lang.Thread.State: RUNNABLE

"Signal Dispatcher@5036" daemon prio=9 tid=0x4 nid=NA runnable
  java.lang.Thread.State: RUNNABLE

Upvotes: 1

Views: 2229

Answers (2)

Rowan Jacobs
Rowan Jacobs

Reputation: 389

Gary Russell helped me out in the comments so I thought I'd post what turned out to be the solution as an answer in case anyone has the same issue. The issue was not with the poller at all, but with the HttpRequestExecutingMessageHandler. The stacktrace shows the task-scheduler-2 thread hanging in the following state:

"task-scheduler-2@4837" prio=5 tid=0x14 nid=NA runnable
  java.lang.Thread.State: RUNNABLE
      at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1)

This indicated that the request to the backend was hanging. There was no timeout set on the RestTemplate used by the message handler, so the thread would wait forever for a request that would never come, which caused the entire application to appear as though it had stopped. My solution was to add the following ClientHttpRequestFactory to the RestTemplate bean:

@Bean
public ClientHttpRequestFactory clientHttpRequestFactory() {
    SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
    factory.setReadTimeout(500);
    factory.setConnectTimeout(500);
    return factory;
}

Setting a nonempty read and connect timeout causes the application to log an exception when the backend times out, after which the application continues running as normal. Many thanks to Gary Russell for helping me figure out this perplexing issue.

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174554

See Configuring the Task Scheduler bean - it only has 10 threads by default.

With queue channels in the mix, the threads are suspended for the receiveTimeout (1 second by default). If you have a lot of queue channels you can experience thread starvation.

As an aside, you generally don't need lots of queue channels; typically a single thread handoff is enough in a flow.

Upvotes: 1

Related Questions