CaioT
CaioT

Reputation: 2211

PubSub acknowledge deadline

I have a cloud function which publishes a message to PubSub and that triggers a cloud run to perform an archive file process. When there are large files, my cloud run python code takes some time to process the data it looks like PubSub is retrying the message after 20 seconds (default acknowledge deadline time) which is triggering another instance from my Cloud Run. I've increased the acknowledge deadline to 600s and redeployed everything but it's still retrying the message after 20 seconds. I am missing anything?

Cloud Function publishing the message code:

# Publishes a message
   try:
      publish_future = publisher.publish(topic_path, data=message_bytes)
      publish_future.result()  # Verify the publish succeeded
      return 'Message published.'
   except Exception as e:
      print(e)
      return (e, 500)

Here is the PubSub subscription config: enter image description here

Logging showing a second instance being triggered after 20s: enter image description here

Cloud Run code:

@app.route("/", methods=["POST"])
def index():
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400        

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        #Decode base64 event['data']
        event_data = base64.b64decode(pubsub_message['data']).decode('utf-8')
        message = json.loads(event_data)

        #logic to process data/archive
        return ("", 204)



Upvotes: 3

Views: 3005

Answers (1)

Dan
Dan

Reputation: 121

You should be able to control the retries by setting the minimumBackoff retrypolicy. You can set the minimumBackoff time to the max of 600 seconds, like your ack deadline, so that redelivered messages will be more than 600 seconds old. This should lower the number of occurrences you see.

To handle duplicates, making your subscriber idempotent is recommended. You need to apply some kind of code check to see if the messageId was processed before.

You can find below in the documentation at-least-once-delivery :

Typically, Pub/Sub delivers each message once and in the order in which it was published. However, messages may sometimes be delivered out of order or more than once. In general, accommodating more-than-once delivery requires your subscriber to be idempotent when processing messages. You can achieve exactly once processing of Pub/Sub message streams using the Apache Beam programming model. The Apache Beam I/O connectors let you interact with Cloud Dataflow via controlled sources and sinks. You can use the Apache Beam PubSubIO connector (for Java and Python) to read from Cloud Pub/Sub. You can also achieve ordered processing with Cloud Dataflow by using the standard sorting APIs of the service. Alternatively, to achieve ordering, the publisher of the topic to which you subscribe can include a sequence token in the message.

Upvotes: 1

Related Questions