Janusz Skonieczny
Janusz Skonieczny

Reputation: 19000

How to override an async NDB method and write your own tasklet

I am trying to grasp async operations introduced with NDB, I would like to use @ndb.tasklet to async some of my work.

The simple example would be string_id generation in the overridden get_or_insert_async

Is this a correct way to to things? What can be improved here?

@classmethod
@ndb.tasklet
def get_or_insert_async(cls, *args):
    id = cls.make_string_id(*args) 
    model = yield super(MyModel, cls).get_or_insert_async(id)
    raise ndb.Return(model)

Another example would be doing stuff in a loop in fan-out kinda way. Is this correct?

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        yield model.put_async()
        raise ndb.Return(None)

    for data in some_collection:
        # will it parallelise internal_tasklet execution? 
        yield internal_tasklet(data)

    raise ndb.Return(None)

EDIT:

As understood the whole concept, yields are here to provide a Future objects which are then collected in parallel (where possible) and executed asynchronously. Am I correct?

After Nick's hint (is it what you meant?):

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        raise ndb.Return(model)                # change here

    models = []
    for data in some_collection:
        # will it parallelise internal_tasklet execution? 
        m = yield internal_tasklet(data)       # change here
        models.appedn(m)                       # change here

    keys = yield ndb.put_multi_async(models)   # change here
    raise ndb.Return(keys)                     # change here

EDIT:

New revised version…

@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):

    @ndb.tasklet
    def internal_tasklet(data):
        do_some_long_taking_stuff(data)
        id = make_stuff_needed_for_id(data)
        model = yield cls.get_or_insert_async(id)
        model.long_processing(data)
        raise ndb.Return(model)                

    futures = []
    for data in some_collection:
        # tasklets won't run in parallel but while 
        # one is waiting on a yield (and RPC underneath)  
        # the other will advance it's execution 
        # up to a next yield or return
        fut = internal_tasklet(data))          # change here
        futures.append(fut)                    # change here

    Future.wait_all(futures)                   # change here

    models = [fut.get_result() for fut in futures]
    keys = yield ndb.put_multi_async(models)   # change here
    raise ndb.Return(keys)                     # change here

Upvotes: 2

Views: 900

Answers (1)

Nick Johnson
Nick Johnson

Reputation: 101149

You don't need to use tasklets if all you want to do is call something async with different arguments - just return the wrapped function's return value, like this:

def get_or_insert_async(cls, *args):
  id = cls.make_string_id(*args)
  return super(MyModel, cls).get_or_insert_async(id)

I'd be cautious about this for several reasons, though: You're changing the meaning of a built in function, which is usually a bad idea, you're changing the signature (positional arguments but no keyword arguments), and you're not passing extra arguments through to the original function.

For your second example, yielding things one at a time will force NDB to wait on their completion - 'yield' is synonymous with 'wait'. Instead, execute the tasklet function for each element in the collection, then wait on them all (by calling yield on the list) at the same time.

Upvotes: 1

Related Questions