Reputation: 1235
I have a async function and need to run in with apscheduller every N minutes. There is a python code below
URL_LIST = ['<url1>',
'<url2>',
'<url2>',
]
def demo_async(urls):
"""Fetch list of web pages asynchronously."""
loop = asyncio.get_event_loop() # event loop
future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
loop.run_until_complete(future) # loop until done
async def fetch_all(urls):
tasks = [] # dictionary of start times for each url
async with ClientSession() as session:
for url in urls:
task = asyncio.ensure_future(fetch(url, session))
tasks.append(task) # create list of tasks
_ = await asyncio.gather(*tasks) # gather task responses
async def fetch(url, session):
"""Fetch a url, using specified ClientSession."""
async with session.get(url) as response:
resp = await response.read()
print(resp)
if __name__ == '__main__':
scheduler = AsyncIOScheduler()
scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
# Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
But when i tried to run it i have the next error info
Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
% threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.
Could you please help me with this? Python 3.6, APScheduler 3.3.1,
Upvotes: 123
Views: 250062
Reputation: 9
In my case the line was like this
asyncio.get_event_loop().run_until_complete(test())
I replaced above line with this line which solved my problem
asyncio.run(test())
Upvotes: -1
Reputation: 351
I had a similar issue where I wanted my asyncio module to be callable from a non-asyncio script (which was running under gevent... don't ask...). The code below resolved my issue because it tries to get the current event loop, but will create one if there isn't one in the current thread. Tested in python 3.9.11.
try:
loop = asyncio.get_event_loop()
except RuntimeError as e:
if str(e).startswith('There is no current event loop in thread'):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
else:
raise
Upvotes: 20
Reputation: 1962
Reading given answers I only manage to fix my websocket thread by using the hint (try replacing) in https://stackoverflow.com/a/46750562/598513 on this page.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
The documentation of BaseDefaultEventLoopPolicy
explains
Default policy implementation for accessing the event loop. In this policy, each thread has its own event loop. However, we only automatically create an event loop by default for the main thread; other threads by default have no event loop.
So when using a thread one has to create the loop.
And I had to reorder my code so my final code
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# !!! Place code after setting the loop !!!
server = Server()
start_server = websockets.serve(server.ws_handler, 'localhost', port)
Upvotes: 5
Reputation: 916
Since this question continues to appear on the first page, I will write my problem and my answer here.
I had a RuntimeError: There is no current event loop in thread 'Thread-X'.
when using flask-socketio and Bleak.
Edit: well, I refactored my file and made a class.
I initialized the loop in the constructor, and now everything is working fine:
class BLE:
def __init__(self):
self.loop = asyncio.get_event_loop()
# function example, improvement of
# https://github.com/hbldh/bleak/blob/master/examples/discover.py :
def list_bluetooth_low_energy(self) -> list:
async def run() -> list:
BLElist = []
devices = await bleak.discover()
for d in devices:
BLElist.append(d.name)
return 'success', BLElist
return self.loop.run_until_complete(run())
Usage:
ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()
Original answer:
The solution was stupid. I did not pay attention to what I did, but I moved some import
out of a function, like this:
import asyncio, platform
from bleak import discover
def listBLE() -> dict:
async def run() -> dict:
# my code that keep throwing exceptions.
loop = asyncio.get_event_loop()
ble_list = loop.run_until_complete(run())
return ble_list
So I thought that I needed to change something in my code, and I created a new event loop using this piece of code just before the line with get_event_loop()
:
loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()
At this moment I was pretty happy, since I had a loop running.
But not responding. And my code relied on a timeout to return some values, so it was pretty bad for my app.
It took me nearly two hours to figure out that the problem was the import
, and here is my (working) code:
def list() -> dict:
import asyncio, platform
from bleak import discover
async def run() -> dict:
# my code running perfectly
loop = asyncio.get_event_loop()
ble_list = loop.run_until_complete(run())
return ble_list
Upvotes: 4
Reputation: 15573
Use asyncio.run()
instead of directly using the event loop.
It creates a new loop and closes it when finished.
This is how the 'run' looks like:
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
Upvotes: 7
Reputation: 3118
The important thing that hasn't been mentioned is why the error occurs. For me personally, knowing why the error occurs is as important as solving the actual problem.
Let's take a look at the implementation of the get_event_loop
of BaseDefaultEventLoopPolicy
:
class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
...
def get_event_loop(self):
"""Get the event loop.
This may be None or an instance of EventLoop.
"""
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop
You can see that the self.set_event_loop(self.new_event_loop())
is only executed if all of the below conditions are met:
self._local._loop is None
- _local._loop
is not setnot self._local._set_called
- set_event_loop
hasn't been called yetisinstance(threading.current_thread(), threading._MainThread)
- current thread is the main one (this is not True in your case)Therefore the exception is raised, because no loop is set in the current thread:
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
Upvotes: 51
Reputation: 2209
In your def demo_async(urls)
, try to replace:
loop = asyncio.get_event_loop()
with:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Upvotes: 216
Reputation: 5911
Just pass fetch_all
to scheduler.add_job()
directly. The asyncio scheduler supports coroutine functions as job targets.
If the target callable is not a coroutine function, it will be run in a worker thread (due to historical reasons), hence the exception.
Upvotes: 17