Rok Purkeljc
Rok Purkeljc

Reputation: 27

Spring Integration - Handling stale sftp sessions

I have implemented the following scenario:

  1. A queueChannel holding Messages in form of byte[]
  2. A MessageHandler, polling the queue channel and uploading files over sftp
  3. A Transformer, listening to errorChannel and sending extracted payload from the failed message back to the queueChannel (thought of as an error handler to handle failed messages so nothing gets lost)

If the sftp server is online, everything works as expected.

If the sftp server is down, the errormessage, that arrives as the transformer is:

org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session

The transformer cannot do anything with this, since the payload's failedMessage is null and throws an exception itself. The transformer looses the message.

How can I configure my flow to make the tranformer get the right message with the corresponding payload of the unsucsesfully uploaded file?

My Configuration:

  @Bean
  public MessageChannel toSftpChannel() {
    final QueueChannel channel = new QueueChannel();
    channel.setLoggingEnabled(true);
    return new QueueChannel();
  }

  @Bean
  public MessageChannel toSplitter() {
    return new PublishSubscribeChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
  public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
      if (message.getPayload() instanceof byte[]) {
        return (String) message.getHeaders().get("name");
      } else {
        throw new IllegalArgumentException("byte[] expected in Payload!");
      }
    });
    return handler;
  }

  @Bean
  public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);

    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
      factory.setPrivateKey(sftpPrivateKey);
      factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
      factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
  }

  @Bean
  @Splitter(inputChannel = "toSplitter")
  public DmsDocumentMessageSplitter splitter() {
    final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
    splitter.setOutputChannelName("toSftpChannel");
    return splitter;
  }

  @Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
  public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

    Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
      .getFailedMessage();
    return MessageBuilder.withPayload(failedMessage)
                         .copyHeadersIfAbsent(failedMessage.getHeaders())
                         .build();
  }

  @MessagingGateway 
  public interface UploadGateway {

    @Gateway(requestChannel = "toSplitter")
    void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
  }

Thanks..

Update

@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
  PollerMetadata poller() {
    return Pollers
      .fixedRate(5000)
      .maxMessagesPerPoll(1)
      .receiveTimeout(500)
      .taskExecutor(taskExecutor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
  }

  @Bean
  @ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
  public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toSftpChannel");
    return bridgeHandler;
  }

Upvotes: 0

Views: 1831

Answers (1)

Gary Russell
Gary Russell

Reputation: 174534

The null failedMessage is a bug; reproduced INT-4421.

I would not recommend using a QueueChannel for this scenario. If you use a direct channel, you can configure a retry advice to attempt redeliveries. when the retries are exhausted (if so configured), the exception will be thrown back to the calling thread.

Add the advice to the SftpMessageHandler's adviceChain property.

EDIT

You can work around the "missing" failedMessage by inserting a bridge between the pollable channel and the sftp adapter:

@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toRealSftpChannel");
    return bridgeHandler;
}

@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("name");
        }
        else {
            throw new IllegalArgumentException("byte[] expected in Payload!");
        }
    });
    return handler;
}

Upvotes: 1

Related Questions