python_interest
python_interest

Reputation: 874

Google Cloud PubSub not being called properly in Python

I am squeezing my brain but I am not getting why this issue is happening and I couldnt figure out the cause. I am trying to read an image and pass it to pubsub. Once the messages are sent through pubsub, it is redirected to AutoML model to identify or predict the given image. Below is the code snippet

global val1   
@app.route("/", methods=['GET', 'POST'])
doc_type=request.form.get('submit_button')
        file_name = secure_filename(file.filename)
        blob=file.read()
        flash('File upload successful', 'info')
        # Initializing PubSub
        publisher,subscriber,topic,subscription=pubsub_init(doc_type) 
        blob=blob+bytes(doc_type,'utf-8')
        subscriber.subscribe(subscription,callback)
        publisher.publish(topic,blob)
        flash("the uploaded file is "+val1,'info')

Init function:

def pubsub_init(doctype):
    publisher=pubsub.PublisherClient()
    subscriber=pubsub.SubscriberClient()
    if doctype=="License":
        subscription=<<sub name>>
    elif doctype=="Credit":
        subscription=<<subname>>
    elif doctype=="Passport":
        subscription=<<subname>>
    else:
        print("invalid choice"
  topic=<<topic>>
print(subscription)
return (publisher,subscriber,topic,subscription)

My Callback:

def callback(message):
    #print("hello",flush=True)
     print("making global")
     project_id=<<proj id>>
     val1,val2=predict_value(new_message,model_id,project_id,compute_region)
     message.ack()

But I am getting error like val1 not defined. Could you please advice on this?

Upvotes: 2

Views: 904

Answers (2)

Dustin Ingram
Dustin Ingram

Reputation: 21520

The issue here is that subscriber.subscribe(subscription, callback) is setting up an asynchronous call to callback.

This means that when you publish a new topic, you're essentially setting up a race condition between whether the call to flash(...) will get executed first, or the callback. Since the callback is presumably taking some time to complete, the flash line is winning, but val1 hasn't been created yet, hence your error.

There are ways to control the concurrency which might make what you're trying to do possible by blocking on the subscriber's future.

However before attempting that, I would ask why you're trying to use pub/sub here in the first place. It seems like you're just setting up a publisher and subscriber to publish a single message, and then trying to do something with the result of that message. Why not just do it all inline?

@app.route("/", methods=['GET', 'POST'])
def your_function(request):
    doc_type=request.form.get('submit_button')
    file_name = secure_filename(file.filename)
    blob=file.read()
    flash('File upload successful', 'info')
    blob=blob+bytes(doc_type,'utf-8')
    # turn a blob into new_message, get model_id from somewhere?
    project_id=<<proj id>>
    val1,val2=predict_value(new_message,model_id,project_id,compute_region)
    flash("the uploaded file is "+val1,'info')

Upvotes: 2

GAEfan
GAEfan

Reputation: 11360

If you're going to call a global, you have to declare as such in the function:

def callback(message):
    global val1
    global val2
    ...
    val1, val2 = predict_value(new_message,model_id,project_id,compute_region)

Upvotes: 1

Related Questions