Reputation: 195
I have a Spring Boot Application and want to receive Messages from multiple AWS SQS queues. These queues all have their own credentials (and there is sadly nothing I can do about that). None of these credentials can access one of the other queues, they are all restricted to exactly one queue.
With only one queue and credential, it is simple. I just have to provide the credentials as AWSCredentialsProvider
Bean and annotate my Method with @SqsListener
\ @EnableSqs
.
But I can not figure out how to do it with multiple credentials.
The @SqsListener
annotation has no way to provide credentials, or a preconfigured AmazonSqs
object, or anything else that would help.
I searched for a way to map a queue to credentials, by extending the CredentialsProvider
or the AmazonSqs
client, but to no avail.
I even tried to inject the credendials in the header of the AmazonHttpClient but that was also not possible.
I tried to create everything needed to listen to an SQS queue manually.
But I'm stuck at creating a MessageHandler for the SimpleMessageListenerContainer
.
The required QueueMessageHandler
only works when created as bean, with an application context.
Otherwise it won't look up methods annotated with @SqsListener
.
Sadly, the only tutorials or examples I could find either use JMS, which I would like to avoid, or just use the @SqsListener
annotation with only one queue.
Is there any other way to provide different credentials for multiple queues?
My test code:
@Component
@Slf4j
public class TestOneQueueA {
public static final String QUEUE_A = "TestOneQueueA";
public TestOneQueueA(Cloud cloud, ResourceIdResolver resourceIdResolver) {
SqsServiceInfo serviceInfo = (SqsServiceInfo) cloud.getServiceInfo(QUEUE_A);
AWSStaticCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(new BasicAWSCredentials(serviceInfo.getAccessKey(),
serviceInfo.getSecretAccessKey()));
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(serviceInfo.getRegion()).build();
QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
queueMessageHandlerFactory.setAmazonSqs(client);
queueMessageHandlerFactory.setMessageConverters(Collections.singletonList(new MappingJackson2MessageConverter()));
QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
queueMessageHandler.afterPropertiesSet(); // won't do anything because of no ApplicationContext
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setResourceIdResolver(resourceIdResolver);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
simpleMessageListenerContainer.start(QUEUE_A); // fails with "Queue with name 'TestOneQueueA' does not exist"
}
@SqsListener(value = QUEUE_A, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: %s \n%s", subject, dto);
}
}
Edit:
After trying some more, I was able to inject my AmazonSQS clients in two separate SimpleMessageListenerContainer
. The problem then became the QueueMessageHandler
.
If I create it manually, without bean context, it simply won't look for any methods with the @SqsListener
annotation. And there is no way to set the handlers manually.
If I create it as bean, it will look at every bean for the annotation. So it will also find the method of the queue it is not supposed to look for. And then it will crash because the credentials don't work.
I can not figure out a way to create a QueueMessageHandler
for only a single SqsListener method.
And SimpleMessageListenerContainer
won't take anything except a QueueMessageHandler
.
Upvotes: 7
Views: 7722
Reputation: 195
After spending some time to look for a better solution, I stuck with the following:
package test;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.AwsRegionProvider;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import test.TestDto;
import test.CustomQueueMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationMessage;
import org.springframework.cloud.aws.messaging.config.annotation.NotificationSubject;
import org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;
@Component
public class TestQueue {
private static final String QUEUE_NAME = "TestQueue";
private static final Logger log = LoggerFactory.getLogger(TestQueue.class);
public TestQueue(AWSCredentialsProvider credentialsProvider, AwsRegionProvider regionProvider) {
AmazonSQSAsync client = AmazonSQSAsyncClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(regionProvider.getRegion())
.build();
// custom QueueMessageHandler to initialize only this queue
CustomQueueMessageHandler queueMessageHandler = new CustomQueueMessageHandler();
queueMessageHandler.init(this);
queueMessageHandler.afterPropertiesSet();
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(client);
factory.setQueueMessageHandler(queueMessageHandler);
SimpleMessageListenerContainer simpleMessageListenerContainer = factory.createSimpleMessageListenerContainer();
simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
try {
simpleMessageListenerContainer.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
simpleMessageListenerContainer.start();
}
@SqsListener(value = QUEUE_NAME, deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void receiveMessage(@NotificationMessage TestDto dto, @NotificationSubject String subject) {
log.info("Received SQS Message: \nSubject: {} \n{}", subject, dto);
}
}
and the custom QueueMessageHandler
:
package test;
import java.util.Collections;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
public class CustomQueueMessageHandler extends QueueMessageHandler {
public void init(Object handler) {
detectHandlerMethods(handler);
}
}
The only purpose of CustomQueueMessageHandler
is to pass a single object where it should scan for SQS annotations.
Since I don't start it with a Spring Context, it won't search every bean for the @SqsListener
annotation.
But all the initialization is hidden behind protected methods.
That's why I needed to overwrite the class, to access those init methods.
I don't think it's a very elegant solution, creating all the AWS client stuff manually, and calling the bean init methods. But it was the only solution I could find that still gives access to all the features of the AWS SQS library, like converting incoming messages and notifications, deletion policy, queue polling including failure handling, etc.
Upvotes: 8
Reputation: 2571
You can declare different @Bean
s for accounts you wish to use with custom @Qualifier
. Let's say you have two SQS queues in two different accounts. Then declare two beans of type AmazonSQS
.
@Qualifier("first")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(credentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
@Qualifier("second")
@Bean public AmazonSQS amazonSQSClient() {
return AmazonSQSClient.builder()
.withCredentials(anotherCredentialsProvider())
.withRegion(Regions.US_EAST_1)
.build();
}
Then in your service, you can @Autowired
them.
@Autowired @Qualifier("second") private AmazonSQS sqsSecond;
Upvotes: 1