Reputation: 27
I have implemented the following scenario:
If the sftp server is online, everything works as expected.
If the sftp server is down, the errormessage, that arrives as the transformer is:
org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session
The transformer cannot do anything with this, since the payload's failedMessage is null and throws an exception itself. The transformer looses the message.
How can I configure my flow to make the tranformer get the right message with the corresponding payload of the unsucsesfully uploaded file?
My Configuration:
@Bean
public MessageChannel toSftpChannel() {
final QueueChannel channel = new QueueChannel();
channel.setLoggingEnabled(true);
return new QueueChannel();
}
@Bean
public MessageChannel toSplitter() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
} else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
final Properties jschProps = new Properties();
jschProps.put("StrictHostKeyChecking", "no");
jschProps.put("PreferredAuthentications", "publickey,password");
factory.setSessionConfig(jschProps);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
if (sftpPrivateKey != null) {
factory.setPrivateKey(sftpPrivateKey);
factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
} else {
factory.setPassword(sftpPasword);
}
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@Splitter(inputChannel = "toSplitter")
public DmsDocumentMessageSplitter splitter() {
final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
splitter.setOutputChannelName("toSftpChannel");
return splitter;
}
@Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
return MessageBuilder.withPayload(failedMessage)
.copyHeadersIfAbsent(failedMessage.getHeaders())
.build();
}
@MessagingGateway
public interface UploadGateway {
@Gateway(requestChannel = "toSplitter")
void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
}
Thanks..
Update
@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
PollerMetadata poller() {
return Pollers
.fixedRate(5000)
.maxMessagesPerPoll(1)
.receiveTimeout(500)
.taskExecutor(taskExecutor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
@Bean
@ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toSftpChannel");
return bridgeHandler;
}
Upvotes: 0
Views: 1831
Reputation: 174534
The null
failedMessage
is a bug; reproduced INT-4421.
I would not recommend using a QueueChannel
for this scenario. If you use a direct channel, you can configure a retry advice to attempt redeliveries. when the retries are exhausted (if so configured), the exception will be thrown back to the calling thread.
Add the advice to the SftpMessageHandler
's adviceChain
property.
EDIT
You can work around the "missing" failedMessage by inserting a bridge between the pollable channel and the sftp adapter:
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toRealSftpChannel");
return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
}
else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
Upvotes: 1