Reputation: 6394
I would like to receive files into a Google Cloud Storage bucket and have a Python job run exactly once for each file. I would like many such Python jobs to be running concurrently, in order to process many files in parallel, but each file should only be processed once.
I have considered the following:
Generate Pub/Sub messages for the OBJECT_FINALIZE event on the bucket. The issue here is that Pub/Sub may deliver messages more than once, so a pool of Python jobs listening to the same subscription may run more than one job for the same message, so I could either...
or
I think 2 would work but it also feels over-engineered.
Instead of using Pub/Sub, have jobs poll for new files in the bucket.
This feels like it would simply replace Pub/Sub with a less robust solution that would still require a locking mechanism.
Use Eventarc to trigger a Cloud Run container holding my code. This seems similar to Pub/Sub, and simpler, but I can find no explanation of how Eventarc deals with things like retries, or whether it comes with any exactly-once guarantees.
Create a central controller process that handles deduplication of file events (received either through Pub/Sub, polling, or Eventarc), then spawns worker jobs and allocates each files exactly once to a worker jobs.
I think this could also work but creates a single point of failure and potentially a throughput bottleneck.
Upvotes: 1
Views: 1197
Reputation: 625
You're on the right track, and yes PubSub Push messages may be delivered more than once.
One simple technique to manage that is to rename the file as you start processing it. Renaming is an atomic transactions, so if it succeeds, you're good to process it.
PROC_PRF = "processing"
bucketName = # get it from the message
fileName = # Get it from the message)
# Renaming of the file below trriggers another google.storage.object.finalize event
if PROC_PRF in fileName:
print("Exiting due to rename event")
# Ack the message an exit
return
storage_client = storage.Client()
bucket = storage_client.bucket(bucketName)
blob = bucket.get_blob(fileName)
try:
newBlob = bucket.rename_blob(blob,new_name = fileName+'.'+PROC_PRF)
except:
raise RuntimeError("Error: File rename from " + fileName + " failed, is this a duplicate function call?")
# The rename worked - process the file & message
Upvotes: 1