user101010101
user101010101

Reputation: 1659

Aws integration spring: S3MessageHandler "one way"

I am trying to create a S3MessageHandler which has a "one-way" behaviour which is blocking. I see in the java docs for the above class the lines;

 * The {@link S3ProgressListener} can be supplied to track the transfer progress.
 * Also the listener can be populated into the returned {@link Transfer} afterwards in the downstream flow.

I have not been able to do the above.

I have created the async version correctly using the "request-reply" behaviour enabled with the use of an outputchannel. But not been able to create the blocking version.

  S3MessageHandler s3MessageHandler = new S3MessageHandler(this.amazonS3, bucket, true);
    s3MessageHandler.setCommandExpression(parser.parseExpression("headers.s3Command"));
    s3MessageHandler.setSendTimeout(2000);
    s3MessageHandler.setKeyExpression(parser.parseExpression("headers.key"));
    s3MessageHandler.setProgressListener(this.assessmentProgressListener);
    s3MessageHandler.setOutputChannel(outputChannel);

The above code will output when the transfer has completed to the outputChannel but this is done async. How do I create the blocking version of the above. I am trying to code that it will block until the s3 sync is complete?

Upvotes: 1

Views: 586

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

You should share more info on the matter.

For example what have you tried and what results.

We have such a test config:

    @Bean
    @ServiceActivator(inputChannel = "s3SendChannel")
    public MessageHandler s3MessageHandler() {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3(), "myBucket");
        s3MessageHandler.setCommandExpression(PARSER.parseExpression("headers.s3Command"));
        Expression keyExpression =
                PARSER.parseExpression("payload instanceof T(java.io.File) ? payload.name : headers.key");
        s3MessageHandler.setKeyExpression(keyExpression);
        s3MessageHandler.setObjectAclExpression(new ValueExpression<>(CannedAccessControlList.PublicReadWrite));
        s3MessageHandler.setUploadMetadataProvider((metadata, message) -> {
            if (message.getPayload() instanceof InputStream) {
                metadata.setContentLength(1);
                metadata.setContentType(MediaType.APPLICATION_JSON_VALUE);
                metadata.setContentDisposition("test.json");
            }
        });
        s3MessageHandler.setProgressListener(s3ProgressListener());
        return s3MessageHandler;
    }

And there is a test which proves that we are blocked waiting for:

AmazonClientException amazonClientException = transfer.waitForException();

Upvotes: 1

Related Questions