alexriviera06
alexriviera06

Reputation: 11

Google Cloud Functions randomly retrying on success

I have a Google Cloud Function triggered by a PubSub. The doc states messages are acknowledged when the function end with success. link

But randomly, the function retries (same execution ID) exactly 10 minutes after execution. It is the PubSub ack max timeout.

stackdriver logging

I also tried to get message ID and acknowledge it programmatically in Function code but the PubSub API respond there is no message to ack with that id. In StackDriver monitoring, I see some messages not being acknowledged.

stackdriver monitoring

Here is my code : main.py

import base64
import logging
import traceback

from google.api_core import exceptions
from google.cloud import bigquery, error_reporting, firestore, pubsub

from sql_runner.runner import orchestrator

logging.getLogger().setLevel(logging.INFO)


def main(event, context):

    bigquery_client = bigquery.Client()
    firestore_client = firestore.Client()
    publisher_client = pubsub.PublisherClient()
    subscriber_client = pubsub.SubscriberClient()

    logging.info(
        'event=%s',
        event
    )
    logging.info(
        'context=%s',
        context
    )
    try:
        query_id = base64.b64decode(event.get('data',b'')).decode('utf-8')
        logging.info(
            'query_id=%s',
            query_id
        )

        # inject dependencies
        orchestrator(
            query_id,
            bigquery_client,
            firestore_client,
            publisher_client
        )

        sub_path = (context.resource['name']
            .replace('topics', 'subscriptions')
            .replace('function-sql-runner', 'gcf-sql-runner-europe-west1-function-sql-runner')
        )

        # explicitly ack message to avoid duplicates invocations
        try:
            subscriber_client.acknowledge(
                sub_path,
                [context.event_id]  # message_id to ack
            )
            logging.warning(
                'message_id %s acknowledged (FORCED)',
                context.event_id
            )
        except exceptions.InvalidArgument as err:
            # google.api_core.exceptions.InvalidArgument: 400 You have passed an invalid ack ID to the service (ack_id=982967258971474).
            logging.info(
                'message_id %s already acknowledged',
                context.event_id
            )
            logging.debug(err)

    except Exception as err:
        # catch all exceptions and log to prevent cold boot
        # report with error_reporting
        error_reporting.Client().report_exception()
        logging.critical(
            'Internal error : %s -> %s',
            str(err),
            traceback.format_exc()
        )


if __name__ == '__main__':  # for testing
    from collections import namedtuple  # use namedtuple to avoid Class creation
    Context = namedtuple('Context', 'event_id resource')
    context = Context('666', {'name': 'projects/my-dev/topics/function-sql-runner'})

    script_to_start = b' '   # launch the 1st script
    script_to_start = b'060-cartes.sql'

    main(
        event={"data": base64.b64encode(script_to_start)},
        context=context
    )

Here is my code : runner.py

import logging
import os

from retry import retry


PROJECT_ID = os.getenv('GCLOUD_PROJECT') or 'my-dev'


def orchestrator(query_id, bigquery_client, firestore_client, publisher_client):
    """ 
    if query_id empty, start the first sql script
    else, call the given query_id. 

    Anyway, call the next script.
    If the sql script is the last, no call

    retrieve SQL queries from FireStore
    run queries on BigQuery
    """
    docs_refs = [ 
        doc_ref.get() for doc_ref in 
        firestore_client.collection(u'sql_scripts').list_documents()
    ]

    sorted_queries = sorted(docs_refs, key=lambda x: x.id)

    if not bool(query_id.strip()) :  # first execution 
        current_index = 0
    else:
        # find the query to run
        query_ids = [ query_doc.id for query_doc in sorted_queries]
        current_index = query_ids.index(query_id)

    query_doc = sorted_queries[current_index]

    bigquery_client.query(
        query_doc.to_dict()['request'],  # sql query
    ).result()

    logging.info(
        'Query %s executed',
        query_doc.id
    )

    # exit if the current query is the last
    if len(sorted_queries) == current_index + 1:
        logging.info('All scripts were executed.')
        return

    next_query_id = sorted_queries[current_index+1].id.encode('utf-8')

    publish(publisher_client, next_query_id)

@retry(tries=5)
def publish(publisher_client, next_query_id):
    """
    send a message in pubsub to call the next query
    this mechanism allow to run one sql script per Function instance
    so as to not exceed the 9min deadline limit
    """
    logging.info('Calling next query %s', next_query_id)

    future = publisher_client.publish(
        topic='projects/{}/topics/function-sql-runner'.format(PROJECT_ID),
        data=next_query_id
    )

    # ensure publish is successfull
    message_id = future.result()
    logging.info('Published message_id = %s', message_id)

It looks like the pubsub message is not ack on success. I do not think I have background activity in my code.

My question : why my Function is randomly retrying even when success ?

Upvotes: 1

Views: 440

Answers (1)

Doug Stevenson
Doug Stevenson

Reputation: 317487

Cloud Functions does not guarantee that your functions will run exactly once. According to the documentation, background functions, including pubsub functions, are given an at-least-once guarantee:

Background functions are invoked at least once. This is because of the asynchronous nature of handling events, in which there is no caller that waits for the response. The system might, in rare circumstances, invoke a background function more than once in order to ensure delivery of the event. If a background function invocation fails with an error, it will not be invoked again unless retries on failure are enabled for that function.

Your code will need to expect that it could possibly receive an event more than once. As such, your code should be idempotent:

To make sure that your function behaves correctly on retried execution attempts, you should make it idempotent by implementing it so that an event results in the desired results (and side effects) even if it is delivered multiple times. In the case of HTTP functions, this also means returning the desired value even if the caller retries calls to the HTTP function endpoint. See Retrying Background Functions for more information on how to make your function idempotent.

Upvotes: 3

Related Questions