Reputation: 37016
I experience very strange floating bug() when I test application in GCP environment. I can't find out concrete steps to reproduce but it really happens from time to time.
I see that message was successfully acknowledged:
2019-12-06 12:37:47.348 INFO 1 --- [sub-subscriber3] .i.g.MyAcknowledgementHandler : Acknowledged message - 1575635858865987
I have following code to acknowledge:
var generation = message.getHeaders().get("objectGeneration");
pubSubMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class)
pubSubMessage.ack().addCallback(
v -> {
removeFromIdempotentStore(targetMessage, false);
log.info("Acknowledged message - {}", generation);
},
e -> {
removeFromIdempotentStore(targetMessage, false);
log.error("Failed to acknowledge message - {}", generation, e);
}
);
Also I see following log:
2019-12-06 12:37:48.868 WARN 1 --- [sub-subscriber1] c.b.m.i.MyDiscardedMessagesHandler : Duplicate message received GenericMessage [... headers={gcp_pubsub_acknowledgement=org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$1@1abafe68, bxwid=12345, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@3c3efd63, idempotent.keys=[objectId.mixed emails.csv, objectGeneration.1575635858865987].....
And it repeats endless. Additionally I see in the subscription diagram that message still there(after acknowledgement callback invocation)
Discard logic:
....
.gateway(nexrFlow, idempotentByHeader("objectId"));
Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader))
.errorChannel(errorChannel())
.replyTimeout(0L);
}
default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
interceptor.setDiscardChannel(idempotentDiscardChannel());
return interceptor;
}
I have no ideas how to troubleshoot it. Any ideas?
Upvotes: 2
Views: 629
Reputation: 468
Pub/sub is designed to guarantee at-least-one-message delivery, so this is expected behavior. Check out the product FAQs for the official explanation.
As stated there, one reason for frequent duplicates is that messages are not acknowledged within the acknowledgement deadline. If the processing of that message takes more time than the deadline, the message gets resent. That is why I was asking for the AckDeadline in my previous comment. By default it should be 10 seconds. You can check how you have it configured in your console, clicking on the subscription you are using. You can try to increase it in order to wait more for the messages finishing being processed. Do it by clicking in edit once inside the subscription.
However even if you are acknowledging within the deadline sometimes duplicates will occur. This is necessary in order to guarantee at-least-once delivery.
Upvotes: 3