Reputation: 2881
I have a tasklet that takes 3 parameters, a dictionary with an id and title, an user key, and a list of records already on the database. the function defer_fetch is called in a for loop N times.
@ndb.tasklet
def defer_fetch(data, user_key, already_inserted):
if data['_id'] not in already_inserted:
document_key = yield Document.fetch_or_create(data)
yield defer_fetch_document(user_key, document_key)
else:
document_key = alread_inserted[data['_id']]
raise ndb.Return(document_key)
@ndb.tasklet
def defer_fetch_document(user_key, document_key):
deferred.defer(process_document, user_key, document_key, _queue="process-documents")
raise ndb.Return(True)
The code for document.fetch_or_create is being executed in parallel among all the calls of defer_fetch, but the call fetch_document isn't, as seen on attachment
How to make the defer_fetch_document also run parallel?
Upvotes: 4
Views: 1011
Reputation: 2265
As far as I'm aware deferred doesn't play nicely with ndb.tasklets (there's no yield on deferred.defer, so the code just runs synchronously). Instead, you should use the task queue directly with it's add_async functionality.
@ndb.tasklet
def defer_fetch_document(user_key, document_key):
queue = taskqueue.Queue("process-documents")
task = taskqueue.Task(url="<url for worker>",
params={"document_key": document_key.urlsafe()})
yield queue.add_async(task) #this returns a rpc which you can yield on
raise ndb.Return(True)
Alternatively, generate a list of document keys (from which you generate list of tasks) then call add_async with list of tasks.
Upvotes: 5