Reputation: 61
I'm running the following, using requests-threads:
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def _perform_requests():
for request in prepared_requests:
results.append(session.request(**request))
for i, result in enumerate(results):
results[i] = await asyncio.ensure_future(results[i])
session.run(_perform_requests)
return results
However when I run it a couple of odd things happen, firstly I get loads of messages like:
(WARNING) Connection pool is full, discarding connection:
And secondly I get this error:
results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future
I'm using ensure_future()
, so what is going on?
Upvotes: 0
Views: 609
Reputation: 1121476
The session.request()
method returns a twisted Deferred
object (the requests-threads
code calls twisted.internet.threads.deferToThread()
). You generally don't want to treat this as an asyncio task, not and run under the Twisted reactor.
Instead, you could use twisted.internet.deferred.gatherResults()
to execute the requests concurrently and to gather the responses.
Next, session.run()
calls twisted.internet.task.react()
, which will always exit Python:
[...] this function will also:
[...]
- Exit the application when done, with exit code 0 in case of success and 1 in case of failure.
(bold emphasis mine).
This means that even if your code works, the return results
line is never reached.
If you move out the session.run()
call to be the top-level entry point for your application then things work:
from requests_threads import AsyncSession
from twisted.internet import defer
session = AsyncSession(n=100)
async def perform_requests():
prepared_requests = [...]
requests = [session.request(**request) for request in prepared_requests]
responses = await defer.gatherResults(requests)
print(responses)
session.run(perform_requests)
but does exit immediately after printing the responses
list.
Otherwise, you'll have to manage the twisted reactor directly (using reactor.run()
and a callback that calls reactor.stop()
once the responses are done); e.g.:
from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def gather_responses():
requests = [session.request(**request) for request in prepared_requests]
results[:] = await defer.gatherResults(requests)
try:
reactor.stop()
except error.ReactorNotRunning:
pass
deferred = defer.ensureDeferred(gather_responses())
reactor.run()
return results
print(perform_requests())
If you need to run multiple tasks on the twisted reactor, you'd either a single top-level function and rely on callbacks to let you know when responses are complete.
Personally, I think you'd be much better off using the aiohttp.client
module to run asynchronous requests under the Python asyncio
event loop:
import asyncio
import aiohttp
async def perform_requests():
prepared_requests = [...]
conn = aiohttp.TCPConnector(limit=100)
with aiohttp.ClientSession(connector=conn) as session:
requests = [session.request(**request) for request in prepared_requests]
responses = await asyncio.gather(*requests)
print(responses)
if __name__ == '__main__':
asyncio.run(perform_requests())
Note that asyncio.run()
requires Python 3.7 or newer; your error message suggests you are still using 3.5 or 3.6. A work-around would be to use loop.run_until_complete()
:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(perform_requests())
Upvotes: 1