Reputation: 19000
I am trying to grasp async operations introduced with NDB, I would like to use @ndb.tasklet
to async some of my work.
The simple example would be string_id generation in the overridden get_or_insert_async
Is this a correct way to to things? What can be improved here?
@classmethod
@ndb.tasklet
def get_or_insert_async(cls, *args):
id = cls.make_string_id(*args)
model = yield super(MyModel, cls).get_or_insert_async(id)
raise ndb.Return(model)
Another example would be doing stuff in a loop in fan-out kinda way. Is this correct?
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
yield model.put_async()
raise ndb.Return(None)
for data in some_collection:
# will it parallelise internal_tasklet execution?
yield internal_tasklet(data)
raise ndb.Return(None)
EDIT:
As understood the whole concept, yields
are here to provide a Future
objects which are then collected in parallel (where possible) and executed asynchronously. Am I correct?
After Nick's hint (is it what you meant?):
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
raise ndb.Return(model) # change here
models = []
for data in some_collection:
# will it parallelise internal_tasklet execution?
m = yield internal_tasklet(data) # change here
models.appedn(m) # change here
keys = yield ndb.put_multi_async(models) # change here
raise ndb.Return(keys) # change here
EDIT:
New revised version…
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
raise ndb.Return(model)
futures = []
for data in some_collection:
# tasklets won't run in parallel but while
# one is waiting on a yield (and RPC underneath)
# the other will advance it's execution
# up to a next yield or return
fut = internal_tasklet(data)) # change here
futures.append(fut) # change here
Future.wait_all(futures) # change here
models = [fut.get_result() for fut in futures]
keys = yield ndb.put_multi_async(models) # change here
raise ndb.Return(keys) # change here
Upvotes: 2
Views: 900
Reputation: 101149
You don't need to use tasklets if all you want to do is call something async with different arguments - just return the wrapped function's return value, like this:
def get_or_insert_async(cls, *args):
id = cls.make_string_id(*args)
return super(MyModel, cls).get_or_insert_async(id)
I'd be cautious about this for several reasons, though: You're changing the meaning of a built in function, which is usually a bad idea, you're changing the signature (positional arguments but no keyword arguments), and you're not passing extra arguments through to the original function.
For your second example, yielding things one at a time will force NDB to wait on their completion - 'yield' is synonymous with 'wait'. Instead, execute the tasklet function for each element in the collection, then wait on them all (by calling yield on the list) at the same time.
Upvotes: 1