Gaurav Rawat
Gaurav Rawat

Reputation: 1324

spring integration | See threads stuck in RUNNABLE for a long time when connecting via sftp inbound adapters

We are using spring integration dynamic sftp flows to ingest sftp files . The flow java config looks like below

   from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
            .compareTo(b.lastModified()))//
            .preserveTimestamp(true)//
            .remoteDirectory(job.getRemoteDirectory())//
            .deleteRemoteFiles(job.getDeleteRemoteFiles())//
            .filter(this.compositeRemoteFilter(job))//
            .autoCreateLocalDirectory(true)//
            .preserveTimestamp(true)//
            .maxFetchSize(maxMessagesPerPoll)
            .localFilter(new LocalFileFilter(job))//
            .localDirectory(localDirectory)),
            e -> e.id("testComponent")
                  .autoStartup(false)//
                  .poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .receiveTimeout(1000L)    
                        .handle(UploadHandler)

The caching session factory is something we get dynamically via using a delegate . Most of it works fine but sometimes after running for days we observe some threads stuck in RUNNABLE . Our assumption was if the jsch session was stuck in any way it should eventually come out as we have timeouts both at the session factory level and at the poller .

The dump for the thread looks something like this


java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

Please help if we are missing anything here or if there is some configuration we can do on the si side to fix this . SI version 5.1.13

Another heap dump trace of thread

"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable>  com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals  java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable>  java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable>  org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable>  org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable>  org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group  java.lang.ThreadGroup","656","48","2"
"<local variable>  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable>  org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals  java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext  java.security.AccessControlContext","88","40","2"
"name  java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable>  org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable>  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable>  org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable>  org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable>  java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable>  org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool  org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch  com.jcraft.jsch.JSch","736","32","4"
"proxy  custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig  java.util.Properties size = 2","176","48","4"
"sharedSessionLock  java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"<class>  custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password  java.lang.String ""@#@#@#@#""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"enableDaemonThread  java.lang.Boolean = false","16","16","4"
"serverAliveCountMax  java.lang.Integer = 4  0x00000004","16","16","4"
"serverAliveInterval  java.lang.Integer = 240,000  0x0003A980","16","16","4"
"timeout  java.lang.Integer = 120,000  0x0001D4C0","16","16","4"
"userInfoWrapper  org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
"allowUnknownKeys = boolean false","","1","4"
"isSharedSession = boolean false","","1","4"
"port = int 22  0x00000016","","4","4"
"port = int 22  0x00000016","","4","4"
"<class>  org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
"<loader>  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
"<protection domain>  java.security.ProtectionDomain","400","40","4"
"logger  org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
"isSharedSessionCapable = boolean true","","1","3"
"sharedSessionEpoch = long 0","","8","3"
"testSession = boolean true","","1","3"
"<local variable>  java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
"<local variable>  java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
"blockerLock  java.lang.Object","16","16","2"
"daemon = boolean false","","1","2"
"eetop = long 28,082,176  0x0000000001AC8000","","8","2"
"nativeParkEventPointer = long 140,660,716,930,496  0x00007FEE201105C0","","8","2"
"priority = int 5  0x00000005","","4","2"
"single_step = boolean false","","1","2"
"stackSize = long 0","","8","2"
"stillborn = boolean false","","1","2"
"threadLocalRandomProbe = int -884,406,543  0xCB4906F1","","4","2"
"threadLocalRandomSecondarySeed = int 0","","4","2"
"threadLocalRandomSeed = long -7,128,783,728,802,150,278  0x9D1178F7F429C87A","","8","2"
"threadStatus = int 5  0x00000005","","4","2"
"tid = long 348  0x000000000000015C","","8","2"

Proxy custom code for tunneling

public class SftpProxyCommand implements Proxy
{
    
   String command; 
   Process p = null; 
   InputStream in = null; 
   OutputStream out = null; 
   public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
   {
      this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
            privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
   } 
   public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
   {

      String _command = command.replace("%h", host);
      _command = _command.replace("%p", new Integer(port).toString());
      p = Runtime.getRuntime().exec(_command);
      LOG.debug("Sftp Command : {}", _command);
      in = p.getInputStream();
      out = p.getOutputStream();
   }
 
   public Socket getSocket()
   {
      return null;
   } 
   public InputStream getInputStream()
   {
      return in;
   }
 
   public OutputStream getOutputStream()
   {

      return out;
   }
 
   public void close()
   {
      try
      {
         if (p != null)
         {
            p.getErrorStream().close();
            p.getOutputStream().close();
            p.getInputStream().close();
            p.destroy();
            p = null;
         }
      }
      catch (IOException e)
      {
         LOG.error("Issue in closing sftp command", e);
      }
   }

}

Upvotes: 1

Views: 421

Answers (1)

Gary Russell
Gary Russell

Reputation: 174544

Your proxy is blocking on STDIN.

Upvotes: 1

Related Questions