Reputation: 712
I try to use spring-integration-aws
to poll from a S3 bucket to trigger a spring-batch job. My S3 bucket is not hosted on Amazon, it's on a local minio server, so I got a custom configuration :
@Bean
public AmazonS3 amazonS3(ConfigProperties configProperties) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("http://localhost:9001","eu-west-0")) // Region matches with minio region
.withPathStyleAccessEnabled(configProperties.getS3().isPathStyle())
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
configProperties.getS3().getAccessKey(), configProperties.getS3().getSecretKey()
))).build();
}
I defined my IntegrationFlow this way :
@Bean
public IntegrationFlow s3InboundFlow() {
S3RemoteFileTemplate template = new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
S3StreamingMessageSource s3StreamingMessageSource = new S3StreamingMessageSource(template);
s3StreamingMessageSource.setRemoteDirectory(String.format("%s/OUT/", configProperties.getS3().getBucketDataPath()));
return IntegrationFlows.from(s3StreamingMessageSource, configurer -> configurer
.id("s3InboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.handle(jobLaunchingGateway(jobRepository)) // Launch a spring-batch job
.get();
}
The problem is, when the polling occured, I got the following error :
2020-03-30 19:05:21,008 ERROR [scheduling-1] org.springframework.integration.handler.LoggingHandler: org.springframework.messaging.MessagingException: nested exception is java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:342)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.IllegalStateException: S3 client with invalid S3 endpoint configured: localhost:9001
at com.amazonaws.services.s3.AmazonS3Client.getRegion(AmazonS3Client.java:4270)
at org.springframework.integration.aws.support.S3Session.getHostPort(S3Session.java:228)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:167)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:359)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
It happens because when the file is received, some headers are set in spring-integration-aws
:
AbstractRemoteFileStreamingMessageSource.java
return getMessageBuilderFactory()
.withPayload(session.readRaw(remotePath))
.setHeader(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE, session)
.setHeader(FileHeaders.REMOTE_DIRECTORY, file.getRemoteDirectory())
.setHeader(FileHeaders.REMOTE_FILE, file.getFilename())
.setHeader(FileHeaders.REMOTE_HOST_PORT, session.getHostPort())
.setHeader(FileHeaders.REMOTE_FILE_INFO,
this.fileInfoJson ? file.toJson() : file);
The REMOTE_HOST_PORT
header is set thanks to getHostPort() method. The getHostPort() in S3Session.java then calls the getRegion() method.
The getRegion() method in AmazonS3Client
is not checking if a value is set by the user in signing region
field. It only checks if the host is matching the "amazonaws.com" pattern.
@Override
public String getHostPort() {
Region region = this.amazonS3.getRegion().toAWSRegion();
return String.format("%s.%s.%s:%d", AmazonS3.ENDPOINT_PREFIX, region.getName(), region.getDomain(), 443);
}
@Override
public synchronized Region getRegion() {
String authority = super.endpoint.getAuthority();
if (Constants.S3_HOSTNAME.equals(authority)) {
return Region.US_Standard;
} else {
Matcher m = Region.S3_REGIONAL_ENDPOINT_PATTERN.matcher(authority);
if (m.matches()) {
return Region.fromValue(m.group(1));
} else {
throw new IllegalStateException(
"S3 client with invalid S3 endpoint configured: " + authority);
}
}
}
How is it possible to poll from S3 with a custom endpoint configuration ? Why is the getHostPort() method not checking the signing region value ? Is it possible to workaround this ?
Upvotes: 1
Views: 2001
Reputation: 121552
Yes, it is possible to workaround.
You just extend an S3SessionFactory
to return an extension of S3Session
with overridden getHostPort()
method for your custom endpoint.
public class MyS3SessionFactory extends S3SessionFactory {
private MyS3Session s3Session;
@Override
public S3Session getSession() {
return s3Session;
}
public MyS3SessionFactory(AmazonS3 amazonS3) {
super(amazonS3);
Assert.notNull(amazonS3, "'amazonS3' must not be null.");
this.s3Session = new MyS3Session(amazonS3);
}
}
public class MyS3Session extends S3Session {
public MyS3Session(AmazonS3 amazonS3) {
super(amazonS3);
}
@Override
public String getHostPort() {
return "";
}
}
Let's discuss a possible fix in the issue you have raised: https://github.com/spring-projects/spring-integration-aws/issues/160 !
Upvotes: 1