Reputation: 486
I wrote a script that uses a nursery and the asks module to loop through and call an API based upon the loop variables. I get responses but don't know how to return the data like you would with asyncio.
I also have a question on limiting the APIs to 5 per second.
from datetime import datetime
import asks
import time
import trio
asks.init("trio")
s = asks.Session(connections=4)
async def main():
start_time = time.time()
api_key = 'API-KEY'
org_id = 'ORG-ID'
networkIds = ['id1','id2','idn']
url = 'https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600'
headers = {'X-Cisco-Meraki-API-Key': api_key, 'Content-Type': 'application/json'}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers)
print("Total time:", time.time() - start_time)
async def fetch(url, headers):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
if __name__ == "__main__":
trio.run(main)
When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.
Also, I can limit the number of sessions to 1-4, which helps get down below the 5 API per second limit, but was wondering if there was a built in way to ensure that no more than 5 APIs get called in any given second?
Upvotes: 18
Views: 4879
Reputation: 26911
When I run nursery.start_soon(fetch...) , I am printing data within fetch, but how do I return the data? I didn't see anything similar to asyncio.gather(*tasks) function.
You're asking two different questions, so I'll just answer this one. Matthias already answered your other question.
When you call start_soon()
, you are asking Trio to run the task in the background, and then keep going. This is why Trio is able to run fetch()
several times concurrently. But because Trio keeps going, there is no way to "return" the result the way a Python function normally would. where would it even return to?
You can use a queue to let fetch()
tasks send results to another task for additional processing.
To create a queue:
response_queue = trio.Queue(capacity=len(networkIds))
When you start your fetch tasks, pass the queue as an argument and send a sentintel to the queue when you're done:
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers, response_queue)
await response_queue.put(None)
After you download a URL, put the response into the queue:
async def fetch(url, headers, response_queue):
print("Start: ", url)
response = await s.get(url, headers=headers)
# Add responses to queue
await response_queue.put(response)
print("Finished: ", url, len(response.content), response.status_code)
With the changes above, your fetch tasks will put responses into the queue. Now you need to read responses from the queue so you can process them. You might add a new function to do this:
async def process(response_queue):
async for response in response_queue:
if response is None:
break
# Do whatever processing you want here.
You should start this process function as a background task before you start any fetch tasks so that it will process responses as soon as they are received.
Read more in the Synchronizing and Communicating Between Tasks section of the Trio documentation.
Upvotes: 5
Reputation: 832
As @Adrien Clerc answer states: trio.Queue
has been deprecated: https://trio.readthedocs.io/en/stable/history.html?highlight=trio.Queue#id40
For task communication in Trio see: https://trio.readthedocs.io/en/latest/reference-core.html#using-channels-to-pass-values-between-tasks
Here's a full working minimal (removing the async url get request and replacing with a sleep) example for your use case using open_memory_channel
import datetime
import trio
async def main():
network_ids = ["id1", "id2", "idn"]
url = "https://api.meraki.com/api/v0/networks/{0}/airMarshal?timespan=3600"
send_channel, receive_channel = trio.open_memory_channel(len(network_ids))
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_channel, url, network_ids)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel, url, network_ids):
async with send_channel:
async with trio.open_nursery() as nursery:
for i in network_ids:
nursery.start_soon(fetch, send_channel, url.format(i))
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
# Do your job here
print(f"value received: {value} at time {datetime.datetime.utcnow()}")
async def fetch(send_channel, url):
print(f"Start: {datetime.datetime.utcnow()}")
await trio.sleep(1)
response = f"response for {url}"
await send_channel.send(response)
print(f"Finished: {datetime.datetime.utcnow()}")
if __name__ == "__main__":
trio.run(main)
This prints:
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Start: 2023-03-03 10:21:24.883787
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
Finished: 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id1/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/id2/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
value received: response for https://api.meraki.com/api/v0/networks/idn/airMarshal?timespan=3600 at time 2023-03-03 10:21:25.887040
Upvotes: 2
Reputation: 10962
Based on this answers, you can define the following function:
async def gather(*tasks):
async def collect(index, task, results):
task_func, *task_args = task
results[index] = await task_func(*task_args)
results = {}
async with trio.open_nursery() as nursery:
for index, task in enumerate(tasks):
nursery.start_soon(collect, index, task, results)
return [results[i] for i in range(len(tasks))]
You can then use trio in the exact same way as asyncio by simply patching trio (adding the gather function):
import trio
trio.gather = gather
Here is a practical example:
async def child(x):
print(f"Child sleeping {x}")
await trio.sleep(x)
return 2*x
async def parent():
tasks = [(child, t) for t in range(3)]
return await trio.gather(*tasks)
print("results:", trio.run(parent))
Upvotes: 7
Reputation: 2797
Technically, trio.Queue
has been deprecated in trio 0.9. It has been replaced by trio.open_memory_channel
.
Short example:
sender, receiver = trio.open_memory_channel(len(networkIds)
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, sender, url.format(i), headers)
async for value in receiver:
# Do your job here
pass
And in your fetch
function you should call async sender.send(value)
somewhere.
Upvotes: 6
Reputation: 2544
Returning data: pass the networkID and a dict to the fetch
tasks:
async def main():
…
results = {}
async with trio.open_nursery() as nursery:
for i in networkIds:
nursery.start_soon(fetch, url.format(i), headers, results, i)
## results are available here
async def fetch(url, headers, results, i):
print("Start: ", url)
response = await s.get(url, headers=headers)
print("Finished: ", url, len(response.content), response.status_code)
results[i] = response
Alternately, create a trio.Queue
to which you put
the results; your main task can then read the results from the queue.
API limit: create a trio.Queue(10)
and start a task along these lines:
async def limiter(queue):
while True:
await trio.sleep(0.2)
await queue.put(None)
Pass that queue to fetch
, as another argument, and call await limit_queue.get()
before each API call.
Upvotes: 7