jl6
jl6

Reputation: 6394

How can I process a newly uploaded object in Google Cloud Storage exactly once?

I would like to receive files into a Google Cloud Storage bucket and have a Python job run exactly once for each file. I would like many such Python jobs to be running concurrently, in order to process many files in parallel, but each file should only be processed once.

I have considered the following:

Pub/Sub messages

Generate Pub/Sub messages for the OBJECT_FINALIZE event on the bucket. The issue here is that Pub/Sub may deliver messages more than once, so a pool of Python jobs listening to the same subscription may run more than one job for the same message, so I could either...

  1. Use Dataflow to deduplicate messages, but in my non-streaming use case, dataflow seems to be expensive overkill, and this answer seems to suggest it's not the right tool for the job.

or

  1. Create a locking mechanism using a transactional database (say, PostgreSQL on Cloud SQL). Any job receiving a message can attempt to acquire a lock with the same name as the file, any job that fails to acquire a lock can terminate and not ACK the message, and any job with the lock can continue processing and label the lock as done to prevent any future acquisition of that lock.

I think 2 would work but it also feels over-engineered.

Polling

Instead of using Pub/Sub, have jobs poll for new files in the bucket.

This feels like it would simply replace Pub/Sub with a less robust solution that would still require a locking mechanism.

Eventarc

Use Eventarc to trigger a Cloud Run container holding my code. This seems similar to Pub/Sub, and simpler, but I can find no explanation of how Eventarc deals with things like retries, or whether it comes with any exactly-once guarantees.

Single controller spawning multiple workers

Create a central controller process that handles deduplication of file events (received either through Pub/Sub, polling, or Eventarc), then spawns worker jobs and allocates each files exactly once to a worker jobs.

I think this could also work but creates a single point of failure and potentially a throughput bottleneck.

Upvotes: 1

Views: 1197

Answers (1)

rossco
rossco

Reputation: 625

You're on the right track, and yes PubSub Push messages may be delivered more than once.

One simple technique to manage that is to rename the file as you start processing it. Renaming is an atomic transactions, so if it succeeds, you're good to process it.

    PROC_PRF = "processing"
    bucketName = # get it from the message
    fileName = # Get it from the message)

    # Renaming of the file below trriggers another google.storage.object.finalize event
    if PROC_PRF in fileName:
        print("Exiting due to rename event")
        # Ack the message an exit
        return

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucketName)
    blob = bucket.get_blob(fileName)

    try:
        newBlob = bucket.rename_blob(blob,new_name = fileName+'.'+PROC_PRF)
    except:
        raise RuntimeError("Error: File rename from " + fileName + " failed, is this a duplicate function call?")

    # The rename worked - process the file & message

Upvotes: 1

Related Questions