Reputation: 41
I have an application (Scala 2.10.3, Akka 2.3.1, Camel 2.13.0) which subscribes to a JMS Topic, and is notified via JMS messages when specific files are available for download. Each JMS message includes the name+path of a file which is available for collection via SFTP.
I then want to be able to fetch the files via SFTP, but only fetch files for which we have received a JMS message (to avoid problems where we might fetch a file which is in-progress of being written).
I want a solution which fits in with Akka Camel and the Consumer model. I've read through the Camel options for file2 and ftp2 which are used for SFTP endpoints, but I need help with:
how can I define a class/object which can be referenced in the endpointUri string via &filter=... parameter? I would want to be able to update the filter object so that every time the Consumer polls for a list of files, the updated filter list is applied.
how can I define a custom IdempotentRepository, to allow cache sizes larger than the default of 1000?
My SFTP Consumer Actor currently looks like this (with some values redacted ...):
class SftpConsumer extends Actor with ActorLogging with Consumer {
val host = ...
val username = ...
val keyFile = ...
def endpointUri: String = s"sftp://${host}?username=${username}&privateKeyFile=${keyFile}&idempotent=true"
Upvotes: 1
Views: 1022
Reputation: 41
The filter and idempotentRepository parameters need to refer to objects (by name) in the registry.
For the filter, you need to create an object of a class which extends org.apache.camel.component.file.GenericFileFilter.
For the filter and/or idempotentRepository, you need to create a registry, assign the registry to the Camel context, and register these objects to the registry e.g.
// define a class which extends GenericFileFilter[T], and which
// filters for files which are explicitly allowed via include()
class MyFileFilter extends GenericFileFilter[File] {
var include = Set[String]()
def accept(file: GenericFile[File]) = include.contains(file.getFileName)
def include(filename: String) = include = include + filename
def exclude(filename: String) = include = include - filename
}
// Create a filter and a registry, add a mapping for the file filter to
// the registry, and assign the registry to the camel context
val myFileFilter = new MyFileFilter()
val simpleRegistry = new SimpleRegistry()
simpleRegistry.put("myFilter", myFileFilter )
camel.context.setRegistry(simpleRegistry);
// create a memory-based idempotent repository with a custom cache size
val cacheSize = 2500
val myRepository = MemoryIdempotentRepository.memoryIdempotentRepository(cacheSize)
simpleRegistry.put("myRepository", myRepository)
// adjust the endpointUri to include the &filter= and &idempotentRepository= parameters
def endpointUri: String = s"sftp://${host}?username=${username}...&idempotent=true&idempotentRepository=#myRepository&filter=#myFilter"
Upvotes: 3