Reputation: 472
I have a pull queue being serviced by a backend and when the queue is empty I need to trigger another script.
At the moment I am using a very crude detection in the method that leases the tasks from the queue, so that if the task list returned is empty we presume that there are no more to lease and trigger the next step. However, while this works most of the time, occasionally a lease request seems to return an empty list even though there are tasks available.
Anyway, the better way to do it I think is to use the fetch_statistics method of the Queue. That way the script can monitor whats going on in the pull queue and know that there are no more items left in the queue. Now this is obviously available via the REST api for queues, but it seems rather backward to use this when I am using these internally.
So I am making the Queue.fetch_statistics() call, but it throws an error. I've tried putting the stated error into Google, but it returns nothing. Same here on stackoverflow.
It always throws:
AttributeError: type object 'QueueStatistics' has no attribute '_QueueStatistics__TranslateError'
My code is:
q = taskqueue.Queue('reporting-pull')
try:
logging.debug(q.fetch_statistics())
except Exception, e:
logging.exception(e)
Can anyone shed any light on this? I am I doing something really stupid here?
Upvotes: 2
Views: 774
Reputation: 1806
The Task Queue Statistics API is now documented and publicly available. The error no longer occurs.
Upvotes: 2
Reputation: 472
Just incase it is useful to anyone else, here is an example function to get you started getting queue info from your app. Its only an example, and could do with better error handling, but it should get you up and running. Previously we have used the Taskqueue client but I thought that was a bit overkill as we can lease and delete in the code anyway, so I used app identity, and it worked a treat.
from google.appengine.api import taskqueue
from google.appengine.api import app_identity
from google.appengine.api import urlfetch
try:
import json
except ImportError:
import simplejson as json
import logging
def get_queue_info(queue_name, stats=False):
'''
Uses the Queue REST API to fetch queue info
Args:
queue_name: string - the name of the queue
stats: boolean - get the stats info too
RETURNS:
DICT: from the JSON response or False on fail
'''
scope = 'https://www.googleapis.com/auth/taskqueue'
authorization_token, _ = app_identity.get_access_token(scope)
app_id = app_identity.get_application_id()
#note the s~ denoting HRD its not mentioned in the docs as far as
#I can see, but it wont work without it
uri = 'https://www.googleapis.com/taskqueue/v1beta1/projects/s~%s/taskqueues/%s?getStats=%s' % (app_id, queue_name, stats)
#make the call to the API
response = urlfetch.fetch(uri, method="GET", headers = {"Authorization": "OAuth " + authorization_token})
if response.status_code == 200:
result = json.loads(response.content)
else:
logging.error('could not get queue')
logging.error(response.status_code)
logging.error(response.content)
return False
return result
Don't forget to update your queue.yaml with the acl for your app identity
-name: queue_name
mode: pull
acl:
- user_email: [email protected]
I hope someone finds this useful.
In the meantime I have posted a Feature request so we can do this with the Queue object, please go and star it if you want it too. http://goo.gl/W8Pk1
Upvotes: 3
Reputation: 16890
The immediate reason for the specific error you're getting seems to be a bug in the code; Queue.fetch_statistics() calls QueueStatistics.fetch() calls QueueStatistics._FetchMultipleQueues() which apparently encounters an apiproxy_errors.ApplicationError and then tries to call cls.__TranslateError() but there is no such method on the QueueStatistics class.
I don't know the deeper reason for the ApplicationError, but it may mean that the feature is not yet supported by the production runtime.
Upvotes: 1