Brian
Brian

Reputation: 7654

Java client does not retrieve unacked messages from GCP Pub/Sub topic

I have a Spring Boot client configured with

spring:
  cloud:
    gcp:
      pubsub:
        publisher:
          enable-message-ordering: true
        subscriber:
          flow-control:
            max-outstanding-element-count: 10
(among other spring configs)

using this pub/sub config to set MANUAL AckMode:

@Configuration
public class PubSubConfig {
    ...

    @Bean
    public PubSubInboundChannelAdapter inboundChannelAdapter(
            MessageChannel inputChannel,
            AppConfig appConfig,
            PubSubTemplate pubSubTemplate) {
        PubSubInboundChannelAdapter adapter =
                new PubSubInboundChannelAdapter(pubSubTemplate, "(subscription id)");
        adapter.setOutputChannel(inputChannel);
        adapter.setAckMode(AckMode.MANUAL);
        adapter.setPayloadType(SomeEvent.class);
        return adapter;
    }
}

and I have this event handler that pulls from this pull-based topic

@Component
@Slf4j
@RequiredArgsConstructor
public class EventHandler {

    @ServiceActivator(inputChannel = "inputChannel")
    public void process(SomeEvent someEvent,
                        @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
        logger.info("Received event {}", someEvent);
        long now = System.currentTimeMillis() / 1000L;
        if (someEvent.getEarliestRetryTime() > now) {
            logger.warn(
                    "Event not ready to run yet {}",
                    new Date(someEvent.getEarliestRetryTime() * 1000)
            );
            // No-ack the message
            return;
        }

        // Do stuff
        bool success = doStuff();
        if (success) {
            message.ack();
        }
        // Not successful - no ack and retry
    }
}

The general idea is for the message to wait for 10 minutes before it is processed. Not acknowledging the message should cause the message to be retried again in 600 seconds, as configured here.

enter image description here

However, this message is 1) received once, logging Event not ready to run yet [timestamp in the past], and stays in the queue.

enter image description here

The goal is for messages to be retried as long as possible, every 10 minutes, until doStuff succeeds. What might I be missing?

Upvotes: 0

Views: 75

Answers (1)

Brian Collins
Brian Collins

Reputation: 1

Are you using multiple ordering keys when publishing messages, or just one? If all your messages use the same ordering key, and your client is not ACKing messages promptly upon delivery, this could be the source of your slow message delivery because unacked messages are delaying delivery of subsequent messages with the same ordering key.

Also, are you setting ​​spring.cloud.gcp.pubsub.subscriber.max-ack-extension-period to a nonzero value? This setting causes your Spring subscriber to perform automatic lease management of your messages. If your messages are taking longer than 11m (60s lease expiration time + 10m retry policy redelivery delay) to be redelivered, I expect your Spring subscriber is extending the ack deadline, which is delaying lease expiration and redelivery of your messages.

Upvotes: 0

Related Questions