Tom White
Tom White

Reputation: 61

yield from wasn't used with future

I'm running the following, using requests-threads:

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def _perform_requests():
        for request in prepared_requests:
            results.append(session.request(**request))
        for i, result in enumerate(results):
            results[i] = await asyncio.ensure_future(results[i])

    session.run(_perform_requests)
    return results

However when I run it a couple of odd things happen, firstly I get loads of messages like:

(WARNING) Connection pool is full, discarding connection:

And secondly I get this error:

results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future

I'm using ensure_future(), so what is going on?

Upvotes: 0

Views: 609

Answers (1)

Martijn Pieters
Martijn Pieters

Reputation: 1121476

The session.request() method returns a twisted Deferred object (the requests-threads code calls twisted.internet.threads.deferToThread()). You generally don't want to treat this as an asyncio task, not and run under the Twisted reactor.

Instead, you could use twisted.internet.deferred.gatherResults() to execute the requests concurrently and to gather the responses.

Next, session.run() calls twisted.internet.task.react(), which will always exit Python:

[...] this function will also:

[...]

  • Exit the application when done, with exit code 0 in case of success and 1 in case of failure.

(bold emphasis mine).

This means that even if your code works, the return results line is never reached.

If you move out the session.run() call to be the top-level entry point for your application then things work:

from requests_threads import AsyncSession
from twisted.internet import defer

session = AsyncSession(n=100)

async def perform_requests():
    prepared_requests = [...]
    requests = [session.request(**request) for request in prepared_requests]
    responses = await defer.gatherResults(requests)
    print(responses)        

session.run(perform_requests)

but does exit immediately after printing the responses list.

Otherwise, you'll have to manage the twisted reactor directly (using reactor.run() and a callback that calls reactor.stop() once the responses are done); e.g.:

from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def gather_responses():
        requests = [session.request(**request) for request in prepared_requests]
        results[:] = await defer.gatherResults(requests)
        try:
            reactor.stop()
        except error.ReactorNotRunning:
            pass

    deferred = defer.ensureDeferred(gather_responses())
    reactor.run()
    return results

print(perform_requests())

If you need to run multiple tasks on the twisted reactor, you'd either a single top-level function and rely on callbacks to let you know when responses are complete.

Personally, I think you'd be much better off using the aiohttp.client module to run asynchronous requests under the Python asyncio event loop:

import asyncio
import aiohttp

async def perform_requests():
    prepared_requests = [...]
    conn = aiohttp.TCPConnector(limit=100)

    with aiohttp.ClientSession(connector=conn) as session:
        requests = [session.request(**request) for request in prepared_requests]
        responses = await asyncio.gather(*requests)

        print(responses)

if __name__ == '__main__':
    asyncio.run(perform_requests())

Note that asyncio.run() requires Python 3.7 or newer; your error message suggests you are still using 3.5 or 3.6. A work-around would be to use loop.run_until_complete():

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(perform_requests())

Upvotes: 1

Related Questions