Ximon
Ximon

Reputation: 143

How to set a Message Handler programmatically in Spring Cloud AWS SQS?

maybe someone has an idea to my following problem:

I am currently on a project, where i want to use the AWS SQS with Spring Cloud integration. For the receiver part i want to provide a API, where a user can register a "message handler" on a queue, which is an interface and will contain the user's business logic, e.g.

MyAwsSqsReceiver receiver = new MyAwsSqsReceiver();
receiver.register("a-queue-name", new MessageHandler(){
  @Override
  public void handle(String message){
    //... business logic for the received message
  }
});

I found examples, e.g. https://codemason.me/2016/03/12/amazon-aws-sqs-with-spring-cloud/ and read the docu http://cloud.spring.io/spring-cloud-aws/spring-cloud-aws.html#_sqs_support

But the only thing i found there to "connect" a functionality for processing a incoming message is a annotation on a method, e.g. @SqsListener or @MessageMapping.

These annotations are fixed to a certain queue-name, though. So now i am at a loss, how to dynamically "connect" my provided "MessageHandler" (from my API) to the incoming message for the specified queuename.

In the Config the example there is a SimpleMessageListenerContainer, which gets a QueueMessageHandler set, but this QueueMessageHandler does not seem to be the right place to set my handler or to override its methods and provide my own subclass of QueueMessageHandler.

I already did something like this with the Spring Amqp integration and RabbitMq and thought, that it would be also similar here with AWS SQS.

Does anyone have an idea, how to accomplish this?

thx + bye, Ximon

EDIT:

I found, that Spring JMS could actually do that, e.g. www.javacodegeeks.com/2016/02/aws-sqs-spring-jms-integration.html. Does anybody know, what consequences using JMS protocol has here, good or bad?

Upvotes: 10

Views: 3935

Answers (3)

tanishq porwar
tanishq porwar

Reputation: 11

I was facing a similar problem.

I came across the spring cloud aws 3.0, in which the sqs API was completely rewritten using the Spring Kafka architecture. I was able to dynamically register SQS listeners by creating a MessageListenerContainer from a SqsEndpoint and registering it to the MessageListenerContainerRegistry. Then we can use the start() and stop() methods to start/stop the container.

dependencies

implementation 'io.awspring.cloud:spring-cloud-aws-starter:3.1.1'
implementation 'io.awspring.cloud:spring-cloud-aws-sqs:3.1.1'

// for testing
testImplementation "org.testcontainers:localstack"

bean for MessageListenerContainerFactory:

// can inject a custom SqsAsyncClient with the different aws creds
@Bean
  SqsMessageListenerContainerFactory<String> defaultSqsListenerContainerFactory(
      SqsAsyncClient sqsAsyncClient) {
    return SqsMessageListenerContainerFactory
        .<String>builder()
        .sqsAsyncClient(sqsAsyncClient)
        .build();
  }

EventSource:

@Slf4j
public class SqsEventSource implements MessageListener<String> {

  private final String id;
  private final SqsMessageListenerContainerFactory<String> sqsMessageListenerContainerFactory;
  private final DefaultListenerContainerRegistry defaultListenerContainerRegistry;
  private final EndpointRegistrar endpointRegistrar;
  private final MessageListenerContainer<String> messageListenerContainer;

  public SqsEventSource(
      SqsMessageListenerContainerFactory<String> sqsMessageListenerContainerFactory,
      DefaultListenerContainerRegistry defaultListenerContainerRegistry,
      EndpointRegistrar endpointRegistrar, Properties config)
      throws NoSuchMethodException {
    this.endpointRegistrar = endpointRegistrar;
    this.sqsMessageListenerContainerFactory = sqsMessageListenerContainerFactory;
    this.defaultListenerContainerRegistry = defaultListenerContainerRegistry;

    var sqsProps = new SqsProperties(config);
    this.id = sqsProps.getId();

    var sqsEndpoint = buildSqsEndpoint(sqsProps);

    DefaultMessageHandlerMethodFactory handlerMethodFactory = new DefaultMessageHandlerMethodFactory();
    configureDefaultHandlerMethodFactory(handlerMethodFactory);
    sqsEndpoint.setHandlerMethodFactory(handlerMethodFactory);
    sqsEndpoint.setBean(this);

    switch (sqsEndpoint.getAcknowledgementMode()) {
      case MANUAL -> sqsEndpoint.setMethod(
          this.getClass().getMethod("onMessage", Message.class, Acknowledgement.class));
      case ON_SUCCESS, ALWAYS -> sqsEndpoint.setMethod(this.getClass().getMethod(
          "onMessage", Message.class));
    }

    this.messageListenerContainer = this.sqsMessageListenerContainerFactory.createContainer(
        sqsEndpoint);
  }

  private SqsEndpoint buildSqsEndpoint(SqsProperties props) {
    return new SqsEndpointBuilder()
        .id(props.getId())
        .queueNames(List.of(props.getQueueNames()))
        .factoryBeanName(props.getFactoryBeanName())
        .maxConcurrentMessages(props.getMaxConcurrentMessages())
        .pollTimeoutSeconds(props.getPollTimeoutSeconds())
        .maxMessagesPerPoll(props.getMaxMessagesPerPoll())
        .messageVisibility(props.getMessageVisibility())
        .acknowledgementMode(props.getAcknowledgementMode())
        .build();
  }


  public void start() {

    var container = this.defaultListenerContainerRegistry.getContainerById(
        messageListenerContainer.getId());
    if (Objects.nonNull(container)) {
      container.start();
      return;
    }

    this.defaultListenerContainerRegistry.registerListenerContainer(messageListenerContainer);
    this.defaultListenerContainerRegistry.getContainerById(messageListenerContainer.getId())
        .start();

  }


  public boolean isRunning() {
    return this.defaultListenerContainerRegistry.getContainerById(messageListenerContainer.getId())
        .isRunning();
  }


  public void restart() {
    this.defaultListenerContainerRegistry.getContainerById(messageListenerContainer.getId()).stop();
    this.defaultListenerContainerRegistry.getContainerById(messageListenerContainer.getId())
        .start();

  }

  @Override
  public void onMessage(Message<String> message) {
    log.info("Received message: {}", message);
  }

  public void onMessage(Message<String> message, Acknowledgement acknowledgement) {
    log.info("Received message with Ack: {}", message);
    acknowledgement.acknowledge();
  }

 private CompositeMessageConverter createCompositeMessageConverter() {
    List<MessageConverter> messageConverters = new ArrayList<>();
    messageConverters.add(new StringMessageConverter());
    messageConverters.add(new SimpleMessageConverter());
    messageConverters.add(
        createDefaultMappingJackson2MessageConverter(this.endpointRegistrar.getObjectMapper()));
    this.endpointRegistrar.getMessageConverterConsumer().accept(messageConverters);
    return new CompositeMessageConverter(messageConverters);
  }

  private void configureDefaultHandlerMethodFactory(
      DefaultMessageHandlerMethodFactory handlerMethodFactory) {
    CompositeMessageConverter compositeMessageConverter = createCompositeMessageConverter();

    List<HandlerMethodArgumentResolver> methodArgumentResolvers = new ArrayList<>(
        createArgumentResolvers(compositeMessageConverter));
    this.endpointRegistrar.getMethodArgumentResolversConsumer().accept(methodArgumentResolvers);
    handlerMethodFactory.setArgumentResolvers(methodArgumentResolvers);
    handlerMethodFactory.afterPropertiesSet();
  }

  private List<HandlerMethodArgumentResolver> createArgumentResolvers(
      MessageConverter messageConverter) {
    return Arrays.asList(
        new AcknowledgmentHandlerMethodArgumentResolver(),
        new BatchAcknowledgmentArgumentResolver(),
        new HeadersMethodArgumentResolver(),
        new BatchPayloadMethodArgumentResolver(messageConverter,
            this.endpointRegistrar.getValidator()),
        new MessageMethodArgumentResolver(messageConverter),
        new PayloadMethodArgumentResolver(messageConverter, this.endpointRegistrar.getValidator()));
  }

  private MappingJackson2MessageConverter createDefaultMappingJackson2MessageConverter(
      ObjectMapper objectMapper) {
    MappingJackson2MessageConverter jacksonMessageConverter = new MappingJackson2MessageConverter();
    jacksonMessageConverter.setSerializedPayloadClass(String.class);
    jacksonMessageConverter.setStrictContentTypeMatch(false);
    if (objectMapper != null) {
      jacksonMessageConverter.setObjectMapper(objectMapper);
    }
    return jacksonMessageConverter;
  }
}

Upvotes: 0

Guilherme Alencar
Guilherme Alencar

Reputation: 1403

I am facing the same issue.

I am trying to go in an unusual way where I set up an Aws client bean at build time and then instead of using sqslistener annotation to consume from the specific queue I use the scheduled annotation which I can programmatically pool (each 10 secs in my case) from which queue I want to consume.

I did the example that iterates over queues defined in properties and then consumes from each one.

Client Bean:

@Bean
@Primary
public AmazonSQSAsync awsSqsClient() {
    return AmazonSQSAsyncClientBuilder
            .standard()
            .withRegion(Regions.EU_WEST_1.getName())
            .build();
}

Consumer:

// injected in the constructor
private final AmazonSQSAsync awsSqsClient;

@Scheduled(fixedDelay = 10000)
public void pool() {
    properties.getSqsQueues()
            .forEach(queue -> {
                val receiveMessageRequest = new ReceiveMessageRequest(queue)
                        .withWaitTimeSeconds(10)
                        .withMaxNumberOfMessages(10);

                // reading the messages
                val result = awsSqsClient.receiveMessage(receiveMessageRequest);
                val sqsMessages = result.getMessages();
                log.info("Received Message on queue {}: message = {}", queue, sqsMessages.toString());

                // deleting the messages
                sqsMessages.forEach(message -> {
                    val deleteMessageRequest = new DeleteMessageRequest(queue, message.getReceiptHandle());
                    awsSqsClient.deleteMessage(deleteMessageRequest);
                });
            });
}

Just to clarify, in my case, I need multiple queues, one for each tenant, with the queue URL for each one passed in a property file. Of course, in your case, you could get the queue names from another source, maybe a ThreadLocal which has the queues you have created in runtime.

If you wish, you can also try the JMS approach where you create message consumers and add a listener to each one you wish (See the doc Aws Jms documentation).

Upvotes: 2

haskovec
haskovec

Reputation: 370

When we do Spring and SQS we use the spring-cloud-starter-aws-messaging.

Then just create a Listener class

@Component
public class MyListener {

@SQSListener(value="myqueue")
public void listen(MyMessageType message) {
//process the message
}
}

Upvotes: -1

Related Questions