Reputation: 21
I've been trying trying with no luck to create a endless chain of client instances.
I'm developing an asyncio app, this app among many others things, like running a server with loop.create_server(), needs every 10 seconds to connect to a list of servers, send some data and then disconnect.
I keep getting 2 errors: "runtimeError: Event loop is running." or "asyncio > Task was destroyed but it is pending!"
The code down below works.
import asyncio
from tcp.client import Client
def send_to_peers(data):
for index in range(1, 3): #this for-loop just for simulating a list of peers
try:
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: Client(), '127.0.0.1', 10000 + index)
_, proto = loop.run_until_complete(coro)
msg = data + "-" + str(index) + "\n"
proto.transport.write(str.encode(msg))
proto.transport.close()
except ConnectionRefusedError as exc:
print(exc)
def infinite():
for index in range(5): #again this should be a While True:
#there should be here an asyncio.sleep(10)
send_to_peers(str(index))
infinite()
But when I call it from the main_loop things start to break.
async def infinite_loop():
for index in range(5):
print("loop n " + str(index))
task = asyncio.Task(send_to_peers(str(index)))
await asyncio.sleep(10)
task.cancel()
with suppress(asyncio.CancelledError):
await task
main_loop = asyncio.get_event_loop()
main_loop.run_until_complete(infinite_loop())
main_loop.run_forever()
I've tried to giving the main_loop to send_to_peers , giving it to the Client(loop) class,I tried to stoop & close the loop, delete the task, use weird combination of ensure_future but nothing works.
I googled as much as I could, I read it is not good to nest infinite loops but i didn't find any other way.
My last hope is to use threading but even if I think it will work, it won't be an elegant solution, nor the right one.
I'm used to work with Node so please excuse me if i made a silly mistake, I thought that after 2 weeks i could make it but here I am.
I would really appreciate any help. I'm stuck. Thanks!
PS: The Client() class is very basic:
import asyncio
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(name)s > %(message)s',
stream=sys.stderr
)
class Client(asyncio.Protocol):
def __init__(self):
self.log = logging.getLogger('client')
self.address = None
self.transport = None
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info('peername')
self.log.debug('{}:{} connected'.format(*self.address))
def data_received(self, data):
self.log.debug('{}:{} just sent {!r}'.format(*self.address, data))
def eof_received(self):
self.log.debug('{}:{} sent EOF'.format(*self.address))
def connection_lost(self, error=""):
self.log.debug('{}:{} disconnected'.format(*self.address))
self.transport.close()
Upvotes: 1
Views: 2264
Reputation: 155610
I keep getting 2 errors: "runtimeError: Event loop is running." or "asyncio > Task was destroyed but it is pending!"
As you discovered, asyncio event loops do not nest.
To remove the nesting, you should define send_to_peers
as a coroutine using async def
. Inside it loop.run_until_complete(coro)
should be changed to await coro
. Once send_to_peers
is a coroutine, you can call it:
from blocking code such as infinite
using loop.run_until_complete(send_to_peers(...))
from async code such as infinite_loop
using await send_to_peers(...)
.
In case of infinite_loop
, you can implement the timeout using asyncio.wait_for
:
try:
await asyncio.wait_for(send_to_peers(str(index)), 10)
except asyncio.TimeoutError:
# ... timeout ...
Upvotes: 1