Reputation: 11
In a .py module I created this function
from utils.telegram_utils import telegram_message
...
counter = count(1)
def open_websites_with_progress(url):
risultato = open_websites(url)
progress = next(counter)
if progress in milestones:
percentage = (progress / total_urls) * 100
message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
logger.info(message)
if __name__ == '__main__':
print("PROVA")
asyncio.run(telegram_message(message))
if risultato == 'TimeoutError':
logger.warning(f"TimeoutError: xxxxx{url}")
return
elif risultato == 'GoToError':
logger.error(f"GoToError: xxxxx{url}")
return
elif risultato == 'MatchNotFound':
logger.warning(f"MatchNotFound: xxxxx{url}")
return
return risultato
with ThreadPoolExecutor(max_workers=3) as executor_2:
results = executor_2.map(open_websites_with_progress, combined_total_urls)
where the problem encountered is on a function called from another module utils/telegram_utils.py
async def telegram_message(message):
await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
which normally if called in asyncio works, but in this case, inserted in a ThreadPoolExecutor gives me problems: the first time it works, it executes the first 'print' on the bot, but on the second time it stops working and reports the following error:
PROVA
PROVA
PROVA
Traceback (most recent call last):
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 333, in _request_wrapper
code, payload = await self.do_request(
^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_httpxrequest.py", line 292, in do_request
res = await self._client.request(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1585, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1674, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1702, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1739, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1776, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_transports\default.py", line 377, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 216, in handle_async_request
raise exc from None
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 189, in handle_async_request
await self._close_connections(closing)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 305, in _close_connections
await connection.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection.py", line 171, in aclose
await self._connection.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\http11.py", line 265, in aclose
await self._network_stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_backends\anyio.py", line 55, in aclose
await self._stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\streams\tls.py", line 202, in aclose
await self.transport_stream.aclose()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\_backends\_asyncio.py", line 1258, in aclose
self._transport.close()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\proactor_events.py", line 109, in close
self._loop.call_soon(self._call_connection_lost, None)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 795, in call_soon
self._check_closed()
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 541, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 190, in <module>
betfair_scraper()
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 176, in betfair_scraper
for result in results:
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 619, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 317, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
raise self._exception
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 157, in open_websites_with_progress
asyncio.run(telegram_message(message))
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\utils\telegram_utils.py", line 10, in telegram_message
await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 1029, in send_message
return await self._send_message(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 745, in _send_message
result = await self._post(
^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 623, in _post
return await self._do_post(
^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 652, in _do_post
result = await request.post(
^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 201, in post
result = await self._request_wrapper(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 345, in _request_wrapper
raise NetworkError(f"Unknown error in HTTP implementation: {exc!r}") from exc
telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('Event loop is closed')
I tried to set an initial loop and later close it, but I couldn't do it or I don't know how to do it
Upvotes: 0
Views: 68
Reputation: 43993
It's very difficult to determine why you are getting RuntimeError: Event loop is closed with your posting something that is considerably less than a minimal, reproducible example. But I observe that every time you execute asyncio.run(telegram_message(message))
that you are creating a new event loop. This is inefficient. Could it also possibly be the cause of your error? I don't know but perhaps the event loop is being passed to some other thread that is then trying to run an async function on it after asyncio.run
has completed and the event loop closed. The following would be an example of code that demonstrates this issue:
import asyncio
from threading import Thread
import time
async def main():
print('main')
Thread(target=foo, args=(asyncio.get_running_loop(),)).start()
def foo(event_loop):
time.sleep(.5)
# The event loop is already closed:
asyncio.run_coroutine_threadsafe(bar(), event_loop).result()
async def bar():
print('bar')
asyncio.run(main())
So try the following with no guarantees that this will resolve your problem: Create for each thread in the pool its own event loop that gets opened once when the pool is initialized and closed when the pool threads are terminated. Specify a pool initializer function that will create a new event loop for each thread and save it in thread local storage. The event loop is wrapped in a trivial class instance that when garbage collected at thread termination will first close the loop:
from threading import local
class Loop():
def __init__(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
def __call__(self):
return self._loop
def __del__(self):
self._loop.close()
my_local = local()
def init_pool():
# Create an event loop for each thread in the thread pool
my_local.loop = Loop()
from utils.telegram_utils import telegram_message
...
counter = count(1)
def open_websites_with_progress(url):
...
if progress in milestones:
percentage = (progress / total_urls) * 100
message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
#logger.info(message) #rda
if __name__ == '__main__':
print("PROVA")
loop = my_local.loop()
loop.run_until_complete(telegram_message(message))
...
with ThreadPoolExecutor(max_workers=3, initializer=init_pool) as executor_2:
# Iterate the iterator to ensure all submitted tasks have completed
results = list(executor_2.map(open_websites_with_progress, combined_total_urls))
If this does not resolve the issue, then next try removing the __del__
method from class Loop
.
Upvotes: 0