Reputation: 657
In a Spring Boot application I'm using spring-cloud-stream for PubSub (spring-cloud-gcp-pubsub-stream-binder) to subscribe to a topic.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
</dependency>
I use the @EnableBinding
and @StreamListener
annotations to set up the subscriber:
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(INPUT)
public void handleMessage(Message<String> message) {
...
}
}
During the handling of the message it is possible that something goes wrong. In that case I throw an Exception to make sure the message will not get acknowledged and be retried at a later time.
According to the spring cloud stream documentation I should be able to use the properties
spring.cloud.stream.default.consumer.defaultRetryable=true
spring.cloud.stream.default.consumer.backOffInitialInterval=1000
spring.cloud.stream.default.consumer.backOffMultiplier=2.0
spring.cloud.stream.default.consumer.backOffMaxInterval=300000
spring.cloud.stream.default.consumer.maxAttempts=9999
or for a specific channel (input in this case)
spring.cloud.stream.bindings.input.consumer.defaultRetryable=true
spring.cloud.stream.bindings.input.consumer.backOffInitialInterval=1000
spring.cloud.stream.bindings.input.consumer.backOffMultiplier=2.0
spring.cloud.stream.bindings.input.consumer.backOffMaxInterval=300000
spring.cloud.stream.bindings.input.consumer.maxAttempts=9999
But those properties do not seem to be used in my application. The message gets retried every 100ms regardless of what values in use in the above properties.
Can anyone help me with setting the correct retry and/or backoff settings so that messages get retried accordingly?
A fully working minimal example to illustrate my issue can be found on GitHub and looks like this:
Producer:
@Component
public class Main {
private static final Logger LOG = getLogger(Main.class);
private boolean firstExecution = true;
@Autowired
private SuccessSwitch consumerSuccessSwitch;
@Autowired
private PubSubTemplate pubSubTemplate;
@Scheduled(fixedDelay = 10000)
public void doSomethingAfterStartup() {
if (firstExecution) {
firstExecution = false;
consumerSuccessSwitch.letFail();
pubSubTemplate.publish("topic", "payload");
LOG.info("Message published");
} else {
consumerSuccessSwitch.letSucceed();
}
}
}
Consumer:
@EnableBinding(Sink.class)
class Subscriber {
private static final Logger LOG = getLogger(Subscriber.class);
@Autowired
private SuccessSwitch successSwitch;
private int retryCounter = 0;
@StreamListener(INPUT)
public void handleMessage(Message<String> message) {
LOG.info("Received: {} for the {} time", message.getPayload(), ++retryCounter);
if (!successSwitch.succeeded()) {
throw new RuntimeException();
}
LOG.info("Received: {} times", retryCounter);
}
}
Toggle ack/nack in consumer:
@Component
public class SuccessSwitch {
private boolean success = false;
public void letSucceed() {
this.success = true;
}
public void letFail() {
this.success = false;
}
public boolean succeeded() {
return success;
}
}
Upvotes: 1
Views: 768
Reputation: 4361
Looking at PubSubChannelProvisioner
, in the gcp-pubsub binding. When creating a subscription the binding does not configure the retry policy. So unless the retry is somehow handled within spring-cloud-stream instead of the underlying native pub-sub mechanisms, you are out of luck.
What i am considering to do is to create the subscription myself using PubSubAdmin
and then spring-cloud-stream will see the existing subscription with the correct retry policy and use it.
Upvotes: 0