Leandro Castilho
Leandro Castilho

Reputation: 21

Listener SQS stops consuming messages

Hi I'm trying to consuming messages from a queue in my java application, but after a while my consumer stops polling the queue. In my application I have two listeners and the most consumed queue usually stops being consumed. This is my AWS configuration:

@Configuration
public class AWSConfiguration {

    private final String region;
    private final String accessKey;
    private final String secretKey;

    public AWSConfiguration(
            @Value("${cloud.aws.region.static}") final String region,
            @Value("${cloud.aws.credentials.accessKey}") final String accessKey,
            @Value("${cloud.aws.credentials.secretKey}") final String secretKey) {
        this.region = region;
        this.accessKey = accessKey;
        this.secretKey = secretKey;
    }

    private AWSCredentialsProvider getAwsCredentials(final String accessKey, final String secretKey) {
        final BasicAWSCredentials basicAWSCredentials = new BasicAWSCredentials(accessKey, secretKey);
        return new AWSStaticCredentialsProvider(basicAWSCredentials);
    }

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync(){
        return AmazonSQSAsyncClientBuilder
                .standard()
                .withCredentials(getAwsCredentials(accessKey, secretKey))
                .withRegion(region)
                .build();
    }
}

And these are my SQSListeners:

@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-receive-mosaics}", deletionPolicy = ON_SUCCESS)
    public void onReceiveMosaics(@Payload String message) {
        LOGGER.trace("m=receiveMosaics(message={})", message);

        GlebeDTO glebeDTO = receiveMosaicsMessageHandler.parseReceiveMessage(message);

        LOGGER.info("Mosaicos do talhão {} recebidos", glebeDTO.getExternalId());

        if(glebeDTO.getMosaics().size() >= 1){ 
            processMosaicReceivedUseCase.processWithMosaics(buildCriteria(glebeDTO));
        }else{
            processMosaicReceivedUseCase.processWithoutMosaics(buildCriteria(glebeDTO));
        }
    }

@SqsListener(value = "${cloud.aws.sqs.name.image-mosaic-demand-order-request}", deletionPolicy = ON_SUCCESS)
    public void receiveOrder(@Payload String message) {
        LOGGER.trace("m=receiveOrder(message={})", message);

        OrderRequestDTO orderRequestDTO = this.requestOrderMessageHandler.parseReceiveMessage(message);

        LOGGER.info("{} - Recebida solicitação de mosaicos", orderRequestDTO.getOrderId());

        validator.validate(orderRequestDTO);

        Demand demand = orderRequestDTOToDemandConverter.convert(orderRequestDTO);

        receiveRequestUseCase.onReceiveRequest(demand);
    }

Is there any configuration I am missing?

Upvotes: 2

Views: 1061

Answers (1)

joensson
joensson

Reputation: 2037

I know this is an old issue by now and I don't have an answer - but we experience something similar or maybe even the same issue.

Our @SqsListener stops ingesting data after running fine for several days and ingesting hundreds of thousands of messages without issue.

We have extensive monitoring and logging in place - but the monitoring is showing that we are not using more RAM/CPU when this happens than at times where everything is fine. And our logs contain no errors up to or after this happens.

It seems like the threads used by the SqsListener just stops/dies and does not come back to life and we have to do a restart of the Java service to get the integration going again.

Upvotes: 1

Related Questions