Reputation: 987
I've tried the following code:
private final NotificationMessagingTemplate notificationMessagingTemplate;
public void send(final T payload, final Object groupId) {
final ImmutableMap<String, Object> headers = ImmutableMap.of("message-group-id", groupId.toString(),
"message-deduplication-id", UUID.randomUUID().toString());
notificationMessagingTemplate.convertAndSend(topicName, payload, headers);
}
Passing those headers in SQS works fine but in SNS it's not working and it gives the error:
Caused by: com.amazonaws.services.sns.model.InvalidParameterException: Invalid parameter: The MessageGroupId parameter is required for FIFO topics (Service: AmazonSNS; Status Code: 400; Error Code: InvalidParameter; Request ID: 1aa83814-abc8-56e9-ae15-619723438fe9; Proxy: null)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[aws-java-sdk-core-1.11.933.jar:na]
Do I have to change the headers or there is another way arround?
Upvotes: 2
Views: 7737
Reputation: 624
I solved it with the following:
SnsNotification<Event> notification = SnsNotification.builder(event)
.groupId("theGroupId")
.deduplicationId("theDeduplicationId")
.build();
snsTemplate.sendNotification(snsTopicArn, notification);
Upvotes: 0
Reputation: 53
If you don't want to update your spring-cloud-aws as mentioned here: https://github.com/spring-attic/spring-cloud-aws/issues/714, you can do the following:
import com.amazonaws.Request;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sns.model.PublishRequest;
public class SnsFifoHandler extends RequestHandler2 {
@Override
public void beforeRequest(Request<?> request) {
super.beforeRequest(request);
if (request.getOriginalRequest() instanceof PublishRequest) {
PublishRequest publishRequest = (PublishRequest) request.getOriginalRequest();
if (publishRequest.getTopicArn().contains("fifo")) {
String messageGroupId = publishRequest.getMessageAttributes()
.get("MessageGroupId")
.getStringValue();
String deduplicationId = publishRequest.getMessageAttributes()
.get("MessageDeduplicationId")
.getStringValue();
request.addParameter("MessageGroupId", messageGroupId);
request.addParameter("MessageDeduplicationId", deduplicationId);
}
}
}
}
@Bean
public SnsFifoHandler snsFifoHandler() {
return new SnsFifoHandler();
}
@Bean
public AmazonSNSAsync amazonSNSAsync(SnsFifoHandler snsFifoHandler) {
return AmazonSNSAsyncClientBuilder.standard().withRegion(region).withRequestHandlers(snsFifoHandler).build();
}
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class MessagingService {
@Autowired
private NotificationMessagingTemplate notificationMessagingTemplate;
public void sendToFifoTopic(Object message, String topicName, String messageGroupId, String messageDeduplicationId) {
notificationMessagingTemplate.convertAndSend(
topicName,
message,
Map.of("MessageGroupId", messageGroupId, "MessageDeduplicationId", messageDeduplicationId));
}
}
Upvotes: 2
Reputation: 987
I found a way around myself by using sdk without spring, because FIFO for SNS is new, Spring did not implemented any solution for this problem yet and I could not find a way to pass parameters to the topic through Spring, here is the link that help me solve it: https://docs.aws.amazon.com/sns/latest/dg/fifo-topic-code-examples.html
And here is how my method was done:
private final String topicArn;
private final AmazonSNS amazonSNS;
private final ObjectMapper objectMapper;
public void send(final T payload, final Object groupId) {
try {
amazonSNS.publish(new PublishRequest()
.withTopicArn(topicArn)
.withMessageDeduplicationId(UUID.randomUUID().toString())
.withMessage(objectMapper.writeValueAsString(payload))
.withMessageGroupId(groupId.toString()));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
Upvotes: 2