Maarten Dhondt
Maarten Dhondt

Reputation: 657

Backoff policy for Spring Cloud Stream consumer with GCP PubSub

In a Spring Boot application I'm using spring-cloud-stream for PubSub (spring-cloud-gcp-pubsub-stream-binder) to subscribe to a topic.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>

I use the @EnableBinding and @StreamListener annotations to set up the subscriber:

@EnableBinding(Sink.class)
class Subscriber {

    @StreamListener(INPUT)
    public void handleMessage(Message<String> message) {
        ...
    }
}

During the handling of the message it is possible that something goes wrong. In that case I throw an Exception to make sure the message will not get acknowledged and be retried at a later time.

According to the spring cloud stream documentation I should be able to use the properties

spring.cloud.stream.default.consumer.defaultRetryable=true
spring.cloud.stream.default.consumer.backOffInitialInterval=1000
spring.cloud.stream.default.consumer.backOffMultiplier=2.0
spring.cloud.stream.default.consumer.backOffMaxInterval=300000
spring.cloud.stream.default.consumer.maxAttempts=9999

or for a specific channel (input in this case)

spring.cloud.stream.bindings.input.consumer.defaultRetryable=true
spring.cloud.stream.bindings.input.consumer.backOffInitialInterval=1000
spring.cloud.stream.bindings.input.consumer.backOffMultiplier=2.0
spring.cloud.stream.bindings.input.consumer.backOffMaxInterval=300000
spring.cloud.stream.bindings.input.consumer.maxAttempts=9999

But those properties do not seem to be used in my application. The message gets retried every 100ms regardless of what values in use in the above properties.

Can anyone help me with setting the correct retry and/or backoff settings so that messages get retried accordingly?

A fully working minimal example to illustrate my issue can be found on GitHub and looks like this:

Producer:

@Component
public class Main {

    private static final Logger LOG = getLogger(Main.class);

    private boolean firstExecution = true;

    @Autowired
    private SuccessSwitch consumerSuccessSwitch;
    @Autowired
    private PubSubTemplate pubSubTemplate;

    @Scheduled(fixedDelay = 10000)
    public void doSomethingAfterStartup() {
        if (firstExecution) {
            firstExecution = false;
            consumerSuccessSwitch.letFail();

            pubSubTemplate.publish("topic", "payload");
            LOG.info("Message published");
        } else {
            consumerSuccessSwitch.letSucceed();
        }
    }
}

Consumer:

@EnableBinding(Sink.class)
class Subscriber {

    private static final Logger LOG = getLogger(Subscriber.class);

    @Autowired
    private SuccessSwitch successSwitch;
    private int retryCounter = 0;

    @StreamListener(INPUT)
    public void handleMessage(Message<String> message) {
        LOG.info("Received: {} for the {} time", message.getPayload(), ++retryCounter);

        if (!successSwitch.succeeded()) {
            throw new RuntimeException();
        }
        LOG.info("Received: {} times", retryCounter);
    }
}

Toggle ack/nack in consumer:

@Component
public class SuccessSwitch {

    private boolean success = false;

    public void letSucceed() {
        this.success = true;
    }

    public void letFail() {
        this.success = false;
    }

    public boolean succeeded() {
        return success;
    }
}

Upvotes: 1

Views: 768

Answers (1)

dan carter
dan carter

Reputation: 4361

Looking at PubSubChannelProvisioner , in the gcp-pubsub binding. When creating a subscription the binding does not configure the retry policy. So unless the retry is somehow handled within spring-cloud-stream instead of the underlying native pub-sub mechanisms, you are out of luck.

What i am considering to do is to create the subscription myself using PubSubAdmin and then spring-cloud-stream will see the existing subscription with the correct retry policy and use it.

Upvotes: 0

Related Questions