Reputation: 5502
I am trying to explore SNS FIFO topics with SQS FIFO Queue, this what I simply tried. I created SNS FIFO topic and SQS FIFO queue and subscribed the FIFO queue to the FIFO topic. As per the docs, for the aforementioned setting, whenever we publish a message to SNS FIFO queue it should fan-out that message to SQS queue, but it is not happening. I am able to get PublishResult#getMessageId() means the publishing part is happening successfully but the queue doesn't have any messages in it. As the SNS FIFO topic doesn't support email protocol subscription, the only way available for me to assert this pub-sub architecture is, to poll messages from the queue. Because of the fan-out is not happening, the queue seems always empty.
The complete code block:
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sns.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating sqs client
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>", "<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-west-1.amazonaws.com",
"us-west-1")).build();
//creating SNS topic
CreateTopicRequest createTopicRequest = new CreateTopicRequest().withName(topicName + ".fifo");
createTopicRequest
.addAttributesEntry("FifoTopic", "true")
.addAttributesEntry("ContentBasedDeduplication", "false");
CreateTopicResult topicResult = amazonSNS.createTopic(createTopicRequest);
String topicArn = topicResult.getTopicArn();
//creating dead-letter sqs queue
CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
createDLQQueueRequest.addAttributesEntry("FifoQueue", "true");
createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//creating sqs queue
CreateQueueRequest createQueueRequest = new CreateQueueRequest();
createQueueRequest.addAttributesEntry("FifoQueue", "true");
createQueueRequest.addAttributesEntry("ContentBasedDeduplication", "false");
createQueueRequest.withQueueName(topicName + ".fifo");
String reDrivePolicy = "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy", reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResult.getQueueUrl();
//getting ARN value of queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.withProtocol("sqs")
.withTopicArn(topicArn)
.withEndpoint(queueArn);
SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResult.getSubscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage("Test Message" + i)
.withMessageGroupId(topicName)
.withMessageDeduplicationId(UUID.randomUUID().toString());
PublishResult publishResult = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResult.getMessageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All"));
String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages");
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4, Integer.valueOf(approximateNumberOfMessages));
}
}
SNS Access policy (Permissions)
{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": [
"SNS:GetTopicAttributes",
"SNS:SetTopicAttributes",
"SNS:AddPermission",
"SNS:RemovePermission",
"SNS:DeleteTopic",
"SNS:Subscribe",
"SNS:ListSubscriptionsByTopic",
"SNS:Publish",
"SNS:Receive"
],
"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<account>"
}
}
}
]
}
SQS Access policy (Permissions)
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
What am I missing? why the messages are not present in the SQS queue. is there anything I should do with the SQS Queue permission as below?
{
"Id": "Policy1611770719125",
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt1611770707743",
"Action": [
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ListQueueTags",
"sqs:ListQueues",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:SendMessageBatch",
"sqs:SetQueueAttributes"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}
Upvotes: 0
Views: 1898
Reputation: 1082
If you are looking for a non programmatic solution and would rather use the AWS Console, check out the 'How to Use SNS FIFO Topics' section in AWS' official documentation (link below) in which they mention you need to add a statement to the FIFO SQS queue's access policies you are trying to fan out to. This newly added statement grants the FIFO SNS topic permissions to send messages to the queues.
Using their example, if you have a FIFO SNS topic called updates.fifo and you were trying to fan out messages to two queues, customer.fifo and loyalty.fifo, after subscribing the queues to the topic, you would navigate to the customer.fifo queue in the console and edit the access policy by adding the statement mentioned above:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:customer.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}
This would apply to the loyalty.fifo queue as well:
{
"Effect": "Allow",
"Principal": {
"Service": "sns.amazonaws.com"
},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:us-east-2:123412341234:loyalty.fifo",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:sns:us-east-2:123412341234:updates.fifo"
}
}
}
Resource: Introducing Amazon SNS FIFO
Upvotes: 0
Reputation: 5502
Sharing my answer for posterity, as suspected the actual issue is related to Access Policy
when we create FIFO SNS queue and subscribe it to the SQS FIFO queue with AWS SDK V1, the default Access policy will be as below
{
"Version": "2012-10-17",
"Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
The above access policy will be the same even I tried to create SQS FIFO queue with AWS SDK v2 link. So when I manually change the access policy as below, the issue has been resolved and the FIFO SNS topic fan-out happening as specified:
{
"Statement": [
{
"Action": [
"sqs:*"
],
"Effect": "Allow",
"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo",
"Principal": {
"AWS": "*"
}
}
]
}
Code block to add the above Access policy
for every FIFO queue:
Policy policy = new Policy().withStatements(
new Statement(Statement.Effect.Allow)
.withPrincipals(Principal.AllUsers)
.withResources(new Resource(queueArn))
.withActions(SQSActions.AllSQSActions));
Map<String, String> policyQueueAttributes = new HashMap<>();
policyQueueAttributes.put(QueueAttributeName.Policy.toString(), policy.toJson());
amazonSQS.setQueueAttributes(new SetQueueAttributesRequest(queueUrl, policyQueueAttributes));
Added the above code block after creating the SQS FIFO queue solved the issue eventually.
Upvotes: 2