Reputation: 83
My ask is for structured trio pseudo-code (actual trio function-calls, but dummy worker-does-work-here fill-in) so I can understand and try out good flow-control practices for switching between synchronous and asynchronous processes.
I want to do the following...
Aside: I know there are other ways I could achieve my overall goal than the clunky repeated rewrite of a json file -- but I'm not asking for that input; I really would like to understand trio well enough to be able to use it for this flow.
So, the processes that I want to be synchronous:
New to trio, I have working code here ...which I believe is getting the next record-to-process synchronously (via using a trio.Semaphore() technique). But I'm pretty sure I'm not saving the file synchronously.
Learning Go a few years ago, I felt I grokked the approaches to interweaving synchronous and asynchronous calls -- but am not there yet with trio. Thanks in advance.
Upvotes: 2
Views: 584
Reputation: 322
This code uses channels to multiplex requests to and from a pool of workers. I found the additional requirement (in your code comments) that the post-response rate is throttled, so read_entries
sleeps after each send
.
from random import random
import time, asks, trio
snd_input, rcv_input = trio.open_memory_channel(0)
snd_output, rcv_output = trio.open_memory_channel(0)
async def read_entries():
async with snd_input:
for key_entry in range(10):
print("reading", key_entry)
await snd_input.send(key_entry)
await trio.sleep(1)
async def work(n):
async for key_entry in rcv_input:
print(f"w{n} {time.monotonic()} posting", key_entry)
r = await asks.post(f"https://httpbin.org/delay/{5 * random()}")
await snd_output.send((r.status_code, key_entry))
async def save_entries():
async for entry in rcv_output:
print("saving", entry)
async def main():
async with trio.open_nursery() as nursery:
nursery.start_soon(read_entries)
nursery.start_soon(save_entries)
async with snd_output:
async with trio.open_nursery() as workers:
for n in range(3):
workers.start_soon(work, n)
trio.run(main)
Upvotes: 1
Reputation: 106
Here is how I would write the (pseudo-)code:
async def process_file(input_file):
# load the file synchronously
with open(input_file) as fd:
data = json.load(fd)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub})
# save your result json synchronously
save_file(data, input_file)
trio
guarantees you that once you exit the async with
block every task you spawned is complete so you can safely save your file because no more update will occur.
I also removed the grab_next_entry
function because it seems to me that this function will iterate over the same keys (incrementally) at each call (giving a O(n!)) complexity while you could simplify it by just iterating over your dict once (dropping the complexity to O(n))
You don't need the Semaphore
either, except if you want to limit the number of parallel post_update
calls. But trio
offers a builtin mechanism for this as well thanks to its CapacityLimiter that you would use like this:
limit = trio.CapacityLimiter(10)
async with trio.open_nursery() as nursery:
async with limit:
for x in z:
nursery.start_soon(func, x)
UPDATE thanks to @njsmith's comment
So, in order to limit the amount of concurrent post_update
you'll rewrite it like this:
async def post_update(data, limit):
async with limit:
...
And then you can rewrite the previous loop like that:
limit = trio.CapacityLimiter(10)
# iterate over your dict asynchronously
async with trio.open_nursery() as nursery:
for key, sub in data.items():
if sub['updated'] is None:
sub['updated'] = 'in_progress'
nursery.start_soon(post_update, {key: sub}, limit)
This way, we spawn n tasks for the n entries in your data-dict, but if there are more than 10 tasks running concurrently, then the extra ones will have to wait for the limit to be released (at the end of the async with limit
block).
Upvotes: 2