Reputation: 125
I'm using spring integration aws to poll an S3 resource and get files from the S3 bucket and process them using spring integration. Below is what i have :
AmazonS3 amazonS3 = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
@Bean
IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(s3InboundFileSynchronizingMessageSource(),
e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS)))
.handle(receiptProcessor())
.get();
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer() {
S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonS3);
synchronizer.setDeleteRemoteFiles(false);
synchronizer.setPreserveTimestamp(true);
synchronizer.setRemoteDirectory(s3BucketName.concat("/").concat(s3InboundFolder));
synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.dat\\.{0,1}\\d{0,2}"));
return synchronizer;
}
@Bean
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() {
S3InboundFileSynchronizingMessageSource messageSource =
new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer());
messageSource.setAutoCreateLocalDirectory(false);
messageSource.setLocalDirectory(new File(inboundDir));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
return messageSource;
}
and my S3 bucket and key is :
bucketName = shipmentReceipts
key = receipts/originalReceipts/inbound/receipt1.dat
So i'm facing 2 issues with this implementation:
1. The inboundDir folder name is being renamed to a different path name with the s3key appended to it thus causing a FileNotFoundException
. I traced this to the the below code in AbstractInboundFileSynchronizer.java
file:
protected void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory,
Session<F> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
String localFileName = **this.generateLocalFileName(remoteFileName);**
String remoteFilePath = remoteDirectoryPath != null
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
: remoteFileName;
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
**File localFile = new File(localDirectory, localFileName);**
if (!localFile.exists()) {........
So it ends up looking for a file path C:\SpringAws\S3inbound\receipts\originalReceipts\inbound\receipt1.dat which it doesnt find and gives that FileNotFoundException
error. Instead it should just be copying to local folder C:\SpringAws\S3inbound\receipt1.dat
While pulling the S3 objects i noticed it was pulling all objects under shipmentReceipts/receipts
instead of shipmentReceipts/receipts/originalReceipts/inbound
On debugging further i found that the below code snippet in S3Session.java
is responsible for it:
@Override
public S3ObjectSummary[] list(String path) throws IOException {
Assert.hasText(path, "'path' must not be empty String.");
String[] bucketPrefix = path.split("/");
Assert.state(bucketPrefix.length > 0 && bucketPrefix[0].length() >= 3,
"S3 bucket name must be at least 3 characters long.");
String bucket = resolveBucket(bucketPrefix[0]);
ListObjectsRequest listObjectsRequest = new ListObjectsRequest()
.withBucketName(bucket);
if (bucketPrefix.length > 1) {
**listObjectsRequest.setPrefix(bucketPrefix[1]);**
}
/*
For listing objects, Amazon S3 returns up to 1,000 keys in the response.
If you have more than 1,000 keys in your bucket, the response will be truncated.
You should always check for if the response is truncated.
*/
ObjectListing objectListing;
List<S3ObjectSummary> objectSummaries = new ArrayList<>();
do {......
It sets the prefix to everything after the first forward slash /
it encounters.
How do i mitigate these? Thanks!
Upvotes: 1
Views: 880
Reputation: 24518
@user5758361 the first problem you described with nested path can also be solved by overriding S3FileInfo
:
public class S3FileInfo extends org.springframework.integration.aws.support.S3FileInfo {
private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(S3ObjectSummary.class);
public S3FileInfo(S3ObjectSummary s3ObjectSummary) {
super(s3ObjectSummary);
}
@Override
public String getFilename() {
return FilenameUtils.getName(super.getFilename());
}
@Override
public String toJson() {
try {
return OBJECT_WRITER.writeValueAsString(super.getFileInfo());
} catch (JsonProcessingException e) {
throw new UncheckedIOException(e);
}
}
}
toJson
is overridden to avoid a NPE for some objects.
To use it for streaming:
public class S3StreamingMessageSource extends org.springframework.integration.aws.inbound.S3StreamingMessageSource {
public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template) {
super(template, null);
}
public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template,
Comparator<AbstractFileInfo<S3ObjectSummary>> comparator) {
super(template, comparator);
}
@Override
protected List<AbstractFileInfo<S3ObjectSummary>> asFileInfoList(Collection<S3ObjectSummary> collection) {
return collection.stream()
.map(S3FileInfo::new)
.collect(toList());
}
}
BTW, I'm using Spring integration 5.0.0.M4 and Spring Integration AWS 1.1.0.M2, and still have the same issue when using a bucket name like abc/def/
Upvotes: 0
Reputation: 125
As per Artem, i did use the latest milestone release of spring-integration-aws but found it easier to write a custom class that extends the AbstractInboundFileSynchronizer to resolve my issues. Heres the class i created:
public class MyAbstractInboundFileSynchronizer extends AbstractInboundFileSynchronizer<S3ObjectSummary> {
private volatile String remoteFileSeparator = "/";
private volatile String temporaryFileSuffix = ".writing";
private volatile boolean deleteRemoteFiles;
private volatile boolean preserveTimestamp;
private volatile FileListFilter<S3ObjectSummary> filter;
private volatile Expression localFilenameGeneratorExpression;
private volatile EvaluationContext evaluationContext;
@Override
public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
}
@Override
public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
super.setIntegrationEvaluationContext(evaluationContext);
this.evaluationContext = evaluationContext;
}
@Override
public void setRemoteFileSeparator(String remoteFileSeparator) {
super.setRemoteFileSeparator(remoteFileSeparator);
this.remoteFileSeparator = remoteFileSeparator;
}
public MyAbstractInboundFileSynchronizer() {
this(new S3SessionFactory());
}
public MyAbstractInboundFileSynchronizer(AmazonS3 amazonS3) {
this(new S3SessionFactory(amazonS3));
}
/**
* Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances.
* @param sessionFactory The session factory.
*/
public MyAbstractInboundFileSynchronizer(SessionFactory<S3ObjectSummary> sessionFactory) {
super(sessionFactory);
setRemoteDirectoryExpression(new LiteralExpression(null));
setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource"));
}
@Override
public final void setRemoteDirectoryExpression(Expression remoteDirectoryExpression) {
super.setRemoteDirectoryExpression(remoteDirectoryExpression);
}
@Override
public final void setFilter(FileListFilter<S3ObjectSummary> filter) {
super.setFilter(filter);
}
@Override
protected boolean isFile(S3ObjectSummary file) {
return true;
}
@Override
protected String getFilename(S3ObjectSummary file) {
if(file != null){
String key = file.getKey();
String fileName = key.substring(key.lastIndexOf('/')+1);
return fileName;
}
else return null;
}
@Override
protected long getModified(S3ObjectSummary file) {
return file.getLastModified().getTime();
}
@Override
protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile, File localDirectory,
Session<S3ObjectSummary> session) throws IOException {
String remoteFileName = this.getFilename(remoteFile);
//String localFileName = this.generateLocalFileName(remoteFileName);
String localFileName = remoteFileName;
String remoteFilePath = remoteDirectoryPath != null
? (remoteDirectoryPath + remoteFileName)
: remoteFileName;
if (!this.isFile(remoteFile)) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("cannot copy, not a file: " + remoteFilePath);
}
return;
}
File localFile = new File(localDirectory, localFileName);
if (!localFile.exists()) {
String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
File tempFile = new File(tempFileName);
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile));
try {
session.read(remoteFilePath, outputStream);
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
else {
throw new MessagingException("Failure occurred while copying from remote to local directory", e);
}
}
finally {
try {
outputStream.close();
}
catch (Exception ignored2) {
}
}
if (tempFile.renameTo(localFile)) {
if (this.deleteRemoteFiles) {
session.remove(remoteFilePath);
if (this.logger.isDebugEnabled()) {
this.logger.debug("deleted " + remoteFilePath);
}
}
}
if (this.preserveTimestamp) {
localFile.setLastModified(getModified(remoteFile));
}
}
}
}
I also updated the LocalFilenameGeneratorExpression
as per Artem. Thanks!
Upvotes: 0
Reputation: 121177
The first concern for nested path is a know issue and has been fixed in the latest 5.0 M3
: https://spring.io/blog/2017/04/05/spring-integration-5-0-milestone-3-available with the RecursiveDirectoryScanner
.
Meanwhile you have to specify LocalFilenameGeneratorExpression
as:
Expression expression = PARSER.parseExpression("#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this");
synchronizer.setLocalFilenameGeneratorExpression(expression);
The S3ObjectSummary
contains key
as a full path without a bucket
.
The second "nested path" issues has been fixed via: https://github.com/spring-projects/spring-integration-aws/issues/45. The fix is available in the 1.1.0.M1
: https://spring.io/blog/2017/03/09/spring-integration-extension-for-aws-1-1-0-m1-available
Upvotes: 1