sf8193
sf8193

Reputation: 635

Running asyncio task concurrently and in background with "create_task" raises "no running event loop"

My code looks like this:

TDSession = TDClient()
TDSession.grab_refresh_token()
q = queue.Queue(10)
asyncio.run(listener.startStreaming(TDSession, q))
while True:
    message = q.get()
    print('oh shoot!')
    print(message)
    orderEntry.placeOrder(TDSession=TDSession)

I have tried doing asyncio.create_task(listener.startStreaming(TDSession,q)), the problem is I get

RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'startStreaming' was never awaited

Which confused me because this seemed to work in Can an asyncio event loop run in the background without suspending the Python interpreter? which is what I'm trying to do.

With the listener.startStreaming function looking like this:

async def startStreaming(TDSession, q):
    streamingClient = TDSession.create_streaming_session()
    streamingClient.account_activity()
    await streamingClient.build_pipeline()
    while True:
        message = await streamingClient.start_pipeline()
        message = parseMessage(message)
        if message != None:
            print('putting message into q')
            print( dict(message) )
            q.put(message)

Is there a way to make this work where I can run the listener in the background?

I've tried this as well, but it only runs the consumer function, instead of running both at the same time

TDSession.grab_refresh_token()
q = queue.Queue(10)
loop = asyncio.get_event_loop()
loop.create_task(listener.startStreaming(TDSession, q))
loop.create_task(consumer(TDSession, q))
loop.run_forever()

Upvotes: 3

Views: 3842

Answers (1)

Maurice Lam
Maurice Lam

Reputation: 1794

As you found out, the asyncio.run function runs the given coroutine until it is complete. In other words, it waits for the coroutine returned by listener.startStreaming to finish before proceeding to the next line.

Using asyncio.create_task, on the other hand, requires the caller to be already running inside an asyncio loop already. From the docs:

The task is executed in the loop returned by get_running_loop(), RuntimeError is raised if there is no running loop in current thread.

What you need is to combine the two, by creating a function that's async, and then call create_task inside that async function.

For example:

async def main():
  TDSession = TDClient()
  TDSession.grab_refresh_token()
  q = asyncio.Queue(10)
  streaming_task = asyncio.create_task(listener.startStreaming(TDSession, q))
  while True:
    message = await q.get()
    print('oh shoot!')
    print(message)
    orderEntry.placeOrder(TDSession=TDSession)

  await streaming_task  # If you want to wait for `startStreaming` to complete after the while loop

if __name__ == '__main__':
  asyncio.run(main())

Edit: From your comment I realized you want to use the producer-consumer pattern, so I also updated the example above to use asyncio.Queue instead of a queue.Queue, in order for the thread to be able to jump between the producer (startStreaming) and the consumer (the while loop)

Upvotes: 2

Related Questions