Rafael Ferreira Rocha
Rafael Ferreira Rocha

Reputation: 987

How can I pass MessageGroupId for FIFO SNS

I've tried the following code:

  private final NotificationMessagingTemplate notificationMessagingTemplate;

public void send(final T payload, final Object groupId) {
    final ImmutableMap<String, Object> headers = ImmutableMap.of("message-group-id", groupId.toString(),
            "message-deduplication-id", UUID.randomUUID().toString());
    notificationMessagingTemplate.convertAndSend(topicName, payload, headers);
}

Passing those headers in SQS works fine but in SNS it's not working and it gives the error:

Caused by: com.amazonaws.services.sns.model.InvalidParameterException: Invalid parameter: The MessageGroupId parameter is required for FIFO topics (Service: AmazonSNS; Status Code: 400; Error Code: InvalidParameter; Request ID: 1aa83814-abc8-56e9-ae15-619723438fe9; Proxy: null)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) ~[aws-java-sdk-core-1.11.933.jar:na]

Do I have to change the headers or there is another way arround?

Upvotes: 2

Views: 7737

Answers (3)

cs4r
cs4r

Reputation: 624

I solved it with the following:

      SnsNotification<Event> notification = SnsNotification.builder(event)
                .groupId("theGroupId")
                .deduplicationId("theDeduplicationId")
                .build();

        snsTemplate.sendNotification(snsTopicArn, notification);

Upvotes: 0

If you don't want to update your spring-cloud-aws as mentioned here: https://github.com/spring-attic/spring-cloud-aws/issues/714, you can do the following:

  1. Create a handler to intercept the message before it's sent
import com.amazonaws.Request;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sns.model.PublishRequest;

public class SnsFifoHandler extends RequestHandler2 {

  @Override
  public void beforeRequest(Request<?> request) {
    super.beforeRequest(request);

    if (request.getOriginalRequest() instanceof PublishRequest) {
      PublishRequest publishRequest = (PublishRequest) request.getOriginalRequest();

      if (publishRequest.getTopicArn().contains("fifo")) {
        String messageGroupId = publishRequest.getMessageAttributes()
                .get("MessageGroupId")
                .getStringValue();

        String deduplicationId = publishRequest.getMessageAttributes()
                .get("MessageDeduplicationId")
                .getStringValue();

        request.addParameter("MessageGroupId", messageGroupId);
        request.addParameter("MessageDeduplicationId", deduplicationId);
      }
    }
  }
}
  1. Make it a Spring Bean
@Bean
public SnsFifoHandler snsFifoHandler() {
    return new SnsFifoHandler();
}
  1. Use it for constructing the AmazonSnsAsync bean instance
@Bean
public AmazonSNSAsync amazonSNSAsync(SnsFifoHandler snsFifoHandler) {
  return AmazonSNSAsyncClientBuilder.standard().withRegion(region).withRequestHandlers(snsFifoHandler).build();
}
  1. Finally, send the message
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.NotificationMessagingTemplate;
import org.springframework.stereotype.Service;

@Service
public class MessagingService {

    @Autowired
    private NotificationMessagingTemplate notificationMessagingTemplate;

    public void sendToFifoTopic(Object message, String topicName, String messageGroupId, String messageDeduplicationId) {
        notificationMessagingTemplate.convertAndSend(
                topicName,
                message,
                Map.of("MessageGroupId", messageGroupId, "MessageDeduplicationId", messageDeduplicationId));
    }
}

Upvotes: 2

Rafael Ferreira Rocha
Rafael Ferreira Rocha

Reputation: 987

I found a way around myself by using sdk without spring, because FIFO for SNS is new, Spring did not implemented any solution for this problem yet and I could not find a way to pass parameters to the topic through Spring, here is the link that help me solve it: https://docs.aws.amazon.com/sns/latest/dg/fifo-topic-code-examples.html

And here is how my method was done:

private final String topicArn;
private final AmazonSNS amazonSNS;
private final ObjectMapper objectMapper;

public void send(final T payload, final Object groupId) {

    try {
        amazonSNS.publish(new PublishRequest()
                .withTopicArn(topicArn)
                .withMessageDeduplicationId(UUID.randomUUID().toString())
                .withMessage(objectMapper.writeValueAsString(payload))
                .withMessageGroupId(groupId.toString()));
    } catch (final IOException e) {
        throw new RuntimeException(e);
    }
}

Upvotes: 2

Related Questions