bija
bija

Reputation: 83

how to structure beginning and ending synchronous calls using trio?

My ask is for structured trio pseudo-code (actual trio function-calls, but dummy worker-does-work-here fill-in) so I can understand and try out good flow-control practices for switching between synchronous and asynchronous processes.

I want to do the following...

Aside: I know there are other ways I could achieve my overall goal than the clunky repeated rewrite of a json file -- but I'm not asking for that input; I really would like to understand trio well enough to be able to use it for this flow.

So, the processes that I want to be synchronous:

New to trio, I have working code here ...which I believe is getting the next record-to-process synchronously (via using a trio.Semaphore() technique). But I'm pretty sure I'm not saving the file synchronously.

Learning Go a few years ago, I felt I grokked the approaches to interweaving synchronous and asynchronous calls -- but am not there yet with trio. Thanks in advance.

Upvotes: 2

Views: 584

Answers (2)

Zach Thompson
Zach Thompson

Reputation: 322

This code uses channels to multiplex requests to and from a pool of workers. I found the additional requirement (in your code comments) that the post-response rate is throttled, so read_entries sleeps after each send.

from random import random    
import time, asks, trio    

snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)    

async def read_entries():
    async with snd_input:
        for key_entry in range(10):
            print("reading", key_entry)    
            await snd_input.send(key_entry)    
            await trio.sleep(1)    

async def work(n):
    async for key_entry in rcv_input:    
        print(f"w{n} {time.monotonic()} posting", key_entry)    
        r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
        await snd_output.send((r.status_code, key_entry))

async def save_entries():    
    async for entry in rcv_output:    
        print("saving", entry)    

async def main():    
    async with trio.open_nursery() as nursery:
        nursery.start_soon(read_entries)    
        nursery.start_soon(save_entries)    
        async with snd_output:
            async with trio.open_nursery() as workers:
                for n in range(3):
                    workers.start_soon(work, n)

trio.run(main)

Upvotes: 1

ziirish
ziirish

Reputation: 106

Here is how I would write the (pseudo-)code:

    async def process_file(input_file):
        # load the file synchronously
        with open(input_file) as fd:
            data = json.load(fd)

        # iterate over your dict asynchronously
        async with trio.open_nursery() as nursery:
            for key, sub in data.items():
                if sub['updated'] is None:
                    sub['updated'] = 'in_progress'
                    nursery.start_soon(post_update, {key: sub})

        # save your result json synchronously
        save_file(data, input_file)

trio guarantees you that once you exit the async with block every task you spawned is complete so you can safely save your file because no more update will occur.

I also removed the grab_next_entry function because it seems to me that this function will iterate over the same keys (incrementally) at each call (giving a O(n!)) complexity while you could simplify it by just iterating over your dict once (dropping the complexity to O(n))

You don't need the Semaphore either, except if you want to limit the number of parallel post_update calls. But trio offers a builtin mechanism for this as well thanks to its CapacityLimiter that you would use like this:

    limit = trio.CapacityLimiter(10)
    async with trio.open_nursery() as nursery:
        async with limit:
            for x in z:
                nursery.start_soon(func, x)

UPDATE thanks to @njsmith's comment

So, in order to limit the amount of concurrent post_update you'll rewrite it like this:

    async def post_update(data, limit):
        async with limit:
            ...

And then you can rewrite the previous loop like that:

    limit = trio.CapacityLimiter(10)
    # iterate over your dict asynchronously
    async with trio.open_nursery() as nursery:
        for key, sub in data.items():
            if sub['updated'] is None:
                sub['updated'] = 'in_progress'
                nursery.start_soon(post_update, {key: sub}, limit)

This way, we spawn n tasks for the n entries in your data-dict, but if there are more than 10 tasks running concurrently, then the extra ones will have to wait for the limit to be released (at the end of the async with limit block).

Upvotes: 2

Related Questions