Reputation: 691
We are using spring-integration with S3. We have s3-inbound-streaming-channel-adapter to read from S3. What is happening is that if the "get" fails the s3-inbound-streaming-channel-adapter put the filename in "acceptOnceFilter" and doesn't retry on failure.
Q1. What we want is when the s3-inbound-streaming-channel-adapter "gets" a file from S3 and say for some reason this "get" fails... how do we get the s3-inbound-streaming-channel-adapter to retry this "get" request again for the same file?
Q2. On failure, an exception is sent to default "errorChannel" from s3-inbound-streaming-channel-adapter. Would the Message in the exception contain "filename" that failed?
<int:channel id="s3FileProcessingChannel">
<int:queue capacity="15"/>
</int:channel>
<bean id="metadataStore" class="org.springframework.integration.metadata.SimpleMetadataStore"/>
<bean id="acceptOnceFilter"
class="org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="metadataStore"/>
<constructor-arg index="1" value="streaming"/>
</bean>
<int-aws:s3-inbound-streaming-channel-adapter id="s3Region1"
channel="s3FileProcessingChannel"
session-factory="s3SessionFactory"
filter="acceptOnceFilter"
remotedirectoryexpression="'${s3.sourceBucket}/emm'">
<int:poller fixed-delay="1000" max-messages-per-poll="15"/>
</int-aws:s3-inbound-streaming-channel-adapter>
Thanks GM
Upvotes: 0
Views: 310
Reputation: 121177
The S3PersistentAcceptOnceFileListFilter
implements:
/**
* A {@link FileListFilter} that can be reset by removing a specific file from its
* state.
* @author Gary Russell
* @since 4.1.7
*
*/
public interface ResettableFileListFilter<F> extends FileListFilter<F> {
/**
* Remove the specified file from the filter so it will pass on the next attempt.
* @param f the element to remove.
* @return true if the file was removed as a result of this call.
*/
boolean remove(F f);
}
And S3StreamingMessageSource
populates headers like these:
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
When error happened you just need to use that FileHeaders.REMOTE_FILE
to call the mention above remove()
and your failed file is going to be picked up from the S3 on the next poll cycle.
Upvotes: 1