Reputation: 45
I am setting up a new GCP project to read and Parse a CSV file as soon as it is uploaded to a bucket. To that extent, I have created a trigger that publishes to a pub/sub. Pub/Sub itself sends messages to a background function.
Everything seems to be working fine e.g. as soon as a file is uploaded trigger comes to live to send a message to Pubsub and subsequently to the function. I can also see the message coming through to the function.
The problem, however, is sending an Ack back to the pub/sub. Somewhere I read sending back any 2xx status should do the job (to delete the message from the queue), but it does not. As a result pubsub "thinks" the message has not been delivered and sends the message over and over again.
def parse_data(data, context):
if 'data' in data:
args = base64.b64decode(data['data']).decode('utf-8')
pubsub_message = args.replace('\n', ' ')
properties = json.loads(pubsub_message)
myBucket = validate_message(properties, 'bucket')
myFileName = validate_message(properties, "name")
fileLocation = 'gs://'+myBucket+'/'+myFileName
readAndEnhanceData(fileLocation)
return 'OK', 200
else:
return 'Something went wrong, no data received'
and here is the log file that shows the function is being called continuously.
D CSV_Parser_Raw_Data 518626734652287 Function execution took 72855 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734652287
D CSV_Parser_Raw_Data 518626708442766 Function execution took 131886 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626708442766
D CSV_Parser_Raw_Data 518624470100006 Function execution took 65412 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518624470100006
D CSV_Parser_Raw_Data 518626734629237 Function execution took 68004 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518626734629237
D CSV_Parser_Raw_Data 518623777839079 Function execution took 131255 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623777839079
D CSV_Parser_Raw_Data 518623548622842 Function execution took 131186 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623548622842
D CSV_Parser_Raw_Data 518623769252453 Function execution took 133981 ms,
finished with status: 'ok' CSV_Parser_Raw_Data 518623769252453
So I would be grateful to know what I am missing here! I.e. How can I break this loop?
* UPDATE on the Issue * Thanks to @kamal who forced me to open my eyes, tasked myself to recreate buckets/topics etc. whilst I was on the task, re-reviewed everything and realised, I used a temporary file in a sub-folder but in the SAME bucket as upload files! That was the issue. The Finalize event is for ANY object created ANYWHERE in the bucket. So Kamal was right multiple uploads were taking place!
if you are tackling your project in the same way, make sure to create a tmp folder and make sure you do not add ANY trigger to that folder.
Upvotes: 1
Views: 1914
Reputation: 17161
In general, Google Cloud Pub/Sub guarantees at least once delivery of messages. That means it is always possible to get duplicates, though they should be relatively rare. In your case, it is not that the same message is being processed over and over again, it is different messages. The numbers such as 518626734652287 are the message IDs. Since these are different each time, it means that multiple messages were published. It is likely that one of two things is happening:
gsutil notification list gs://<bucket name>
.If the latter is the problem, you will see multiple entries, e.g.:
projects/_/buckets/my-bucket/notificationConfigs/1
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/2
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
projects/_/buckets/my-bucket/notificationConfigs/3
Cloud Pub/Sub topic: projects/cloud-pubsub-training-examples/topics/my-topic
You can delete the extra notifications by issuing a delete with the config name, e.g., gsutil notification delete projects/_/buckets/my-bucket/notificationConfigs/2
.
It is also worth noting that with Cloud Functions and Pub/Sub, there can be two types of subscriptions set up: those configured by the user and those configured by Cloud Functions itself. By default, the ack deadline for the former is 10 seconds. This means that if a message is not acknowledged within 10 seconds, it will be redelivered. For the latter, the default is 600 seconds. If messages take longer to process than this time period, it is likely that redelivery will occur.
You can try to decrease the time it takes to process a message or you can increase the ack deadline. You can increase the ack deadline, using the gcloud
tool:
gcloud pubsub subscriptions update <subscription name> --ack-deadline=180
This would increase the deadline to 3 minutes. You can also do this in the Cloud Console Pub/Sub page by clicking on the subscription, clicking on "Edit," and then changing the "Acknowledgment Deadline" to a larger value.
With Cloud Functions, you do not need to return an HTTP status. This is only necessary if you are using a push subscription directly.
Upvotes: 1
Reputation: 318
You can't just return 200 from your function. You need to actually "ack" the pubsub message. You haven't shown the code that actually gets the message from pubsub, but I assume that somewhere in that code, you have something like:
queue = Queue.Queue()
message = queue.get()
parse_data(message.data, context)
That's where you need to ack the message:
queue = Queue.Queue()
message = queue.get()
if parse_data(message.data, context):
message.ack()
Upvotes: 0