Reputation: 1324
We are using spring integration dynamic sftp flows to ingest sftp files . The flow java config looks like below
from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
.compareTo(b.lastModified()))//
.preserveTimestamp(true)//
.remoteDirectory(job.getRemoteDirectory())//
.deleteRemoteFiles(job.getDeleteRemoteFiles())//
.filter(this.compositeRemoteFilter(job))//
.autoCreateLocalDirectory(true)//
.preserveTimestamp(true)//
.maxFetchSize(maxMessagesPerPoll)
.localFilter(new LocalFileFilter(job))//
.localDirectory(localDirectory)),
e -> e.id("testComponent")
.autoStartup(false)//
.poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
.maxMessagesPerPoll(maxMessagesPerPoll)
.receiveTimeout(1000L)
.handle(UploadHandler)
The caching session factory is something we get dynamically via using a delegate . Most of it works fine but sometimes after running for days we observe some threads stuck in RUNNABLE . Our assumption was if the jsch session was stuck in any way it should eventually come out as we have timeouts both at the session factory level and at the poller .
The dump for the thread looks something like this
java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Please help if we are missing anything here or if there is some configuration we can do on the si side to fix this . SI version 5.1.13
Another heap dump trace of thread
"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable> com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable> java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable> org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable> org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable> org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable> org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group java.lang.ThreadGroup","656","48","2"
"<local variable> custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable> org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext java.security.AccessControlContext","88","40","2"
"name java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable> org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable> java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable> org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable> org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable> java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable> org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch com.jcraft.jsch.JSch","736","32","4"
"proxy custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig java.util.Properties size = 2","176","48","4"
"sharedSessionLock java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"host java.lang.String ""sftp.server""","80","24","4"
"<class> custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password java.lang.String ""@#@#@#@#""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"user java.lang.String ""user""","64","24","4"
"enableDaemonThread java.lang.Boolean = false","16","16","4"
"serverAliveCountMax java.lang.Integer = 4 0x00000004","16","16","4"
"serverAliveInterval java.lang.Integer = 240,000 0x0003A980","16","16","4"
"timeout java.lang.Integer = 120,000 0x0001D4C0","16","16","4"
"userInfoWrapper org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
"allowUnknownKeys = boolean false","","1","4"
"isSharedSession = boolean false","","1","4"
"port = int 22 0x00000016","","4","4"
"port = int 22 0x00000016","","4","4"
"<class> org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
"<loader> org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
"<protection domain> java.security.ProtectionDomain","400","40","4"
"logger org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
"isSharedSessionCapable = boolean true","","1","3"
"sharedSessionEpoch = long 0","","8","3"
"testSession = boolean true","","1","3"
"<local variable> java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
"<local variable> java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
"blockerLock java.lang.Object","16","16","2"
"daemon = boolean false","","1","2"
"eetop = long 28,082,176 0x0000000001AC8000","","8","2"
"nativeParkEventPointer = long 140,660,716,930,496 0x00007FEE201105C0","","8","2"
"priority = int 5 0x00000005","","4","2"
"single_step = boolean false","","1","2"
"stackSize = long 0","","8","2"
"stillborn = boolean false","","1","2"
"threadLocalRandomProbe = int -884,406,543 0xCB4906F1","","4","2"
"threadLocalRandomSecondarySeed = int 0","","4","2"
"threadLocalRandomSeed = long -7,128,783,728,802,150,278 0x9D1178F7F429C87A","","8","2"
"threadStatus = int 5 0x00000005","","4","2"
"tid = long 348 0x000000000000015C","","8","2"
Proxy custom code for tunneling
public class SftpProxyCommand implements Proxy
{
String command;
Process p = null;
InputStream in = null;
OutputStream out = null;
public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
{
this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
}
public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
{
String _command = command.replace("%h", host);
_command = _command.replace("%p", new Integer(port).toString());
p = Runtime.getRuntime().exec(_command);
LOG.debug("Sftp Command : {}", _command);
in = p.getInputStream();
out = p.getOutputStream();
}
public Socket getSocket()
{
return null;
}
public InputStream getInputStream()
{
return in;
}
public OutputStream getOutputStream()
{
return out;
}
public void close()
{
try
{
if (p != null)
{
p.getErrorStream().close();
p.getOutputStream().close();
p.getInputStream().close();
p.destroy();
p = null;
}
}
catch (IOException e)
{
LOG.error("Issue in closing sftp command", e);
}
}
}
Upvotes: 1
Views: 421