Gabriele
Gabriele

Reputation: 11

ERROR: Event loop is closed in ThreadPoolExecutor

In a .py module I created this function


    from utils.telegram_utils import telegram_message

    ...

    counter = count(1)

    def open_websites_with_progress(url):
        risultato = open_websites(url)
        progress = next(counter)

        if progress in milestones:
            percentage = (progress / total_urls) * 100

            message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")

            logger.info(message)
            if __name__ == '__main__':
                print("PROVA")
                asyncio.run(telegram_message(message))

        if risultato == 'TimeoutError':
            logger.warning(f"TimeoutError: xxxxx{url}")
            return

        elif risultato == 'GoToError':
            logger.error(f"GoToError: xxxxx{url}")
            return

        elif risultato == 'MatchNotFound':
            logger.warning(f"MatchNotFound: xxxxx{url}")
            return

        return risultato

    with ThreadPoolExecutor(max_workers=3) as executor_2:
        results = executor_2.map(open_websites_with_progress, combined_total_urls)

where the problem encountered is on a function called from another module utils/telegram_utils.py

async def telegram_message(message):
    await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)

which normally if called in asyncio works, but in this case, inserted in a ThreadPoolExecutor gives me problems: the first time it works, it executes the first 'print' on the bot, but on the second time it stops working and reports the following error:

PROVA
PROVA
PROVA
Traceback (most recent call last):
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 333, in _request_wrapper
    code, payload = await self.do_request(
                    ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_httpxrequest.py", line 292, in do_request
    res = await self._client.request(
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1585, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1674, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1702, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1739, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_client.py", line 1776, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpx\_transports\default.py", line 377, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 216, in handle_async_request
    raise exc from None
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 189, in handle_async_request
    await self._close_connections(closing)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection_pool.py", line 305, in _close_connections
    await connection.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\connection.py", line 171, in aclose
    await self._connection.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_async\http11.py", line 265, in aclose
    await self._network_stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\httpcore\_backends\anyio.py", line 55, in aclose
    await self._stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\streams\tls.py", line 202, in aclose
    await self.transport_stream.aclose()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\anyio\_backends\_asyncio.py", line 1258, in aclose
    self._transport.close()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\proactor_events.py", line 109, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 795, in call_soon
    self._check_closed()
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 541, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 190, in <module>
    betfair_scraper()
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 176, in betfair_scraper
    for result in results:
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 619, in result_iterator
    yield _result_or_cancel(fs.pop())
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 317, in _result_or_cancel
    return fut.result(timeout)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\_base.py", line 401, in __get_result
    raise self._exception
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\betfair.py", line 157, in open_websites_with_progress
    asyncio.run(telegram_message(message))
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\lucag\Desktop\Exchange-main\Exchange-main\utils\telegram_utils.py", line 10, in telegram_message
    await bot.send_message(chat_id=GROUP_CHAT_ID, text=message)
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 1029, in send_message
    return await self._send_message(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 745, in _send_message
    result = await self._post(
             ^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 623, in _post
    return await self._do_post(
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\_bot.py", line 652, in _do_post
    result = await request.post(
             ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 201, in post
    result = await self._request_wrapper(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\lucag\AppData\Local\Programs\Python\Python312\Lib\site-packages\telegram\request\_baserequest.py", line 345, in _request_wrapper
    raise NetworkError(f"Unknown error in HTTP implementation: {exc!r}") from exc
telegram.error.NetworkError: Unknown error in HTTP implementation: RuntimeError('Event loop is closed')

I tried to set an initial loop and later close it, but I couldn't do it or I don't know how to do it

Upvotes: 0

Views: 68

Answers (1)

Booboo
Booboo

Reputation: 43993

It's very difficult to determine why you are getting RuntimeError: Event loop is closed with your posting something that is considerably less than a minimal, reproducible example. But I observe that every time you execute asyncio.run(telegram_message(message)) that you are creating a new event loop. This is inefficient. Could it also possibly be the cause of your error? I don't know but perhaps the event loop is being passed to some other thread that is then trying to run an async function on it after asyncio.run has completed and the event loop closed. The following would be an example of code that demonstrates this issue:

import asyncio
from threading import Thread
import time

async def main():
    print('main')
    Thread(target=foo, args=(asyncio.get_running_loop(),)).start()

def foo(event_loop):
    time.sleep(.5)
    # The event loop is already closed:
    asyncio.run_coroutine_threadsafe(bar(), event_loop).result()

async def bar():
    print('bar')

asyncio.run(main())

So try the following with no guarantees that this will resolve your problem: Create for each thread in the pool its own event loop that gets opened once when the pool is initialized and closed when the pool threads are terminated. Specify a pool initializer function that will create a new event loop for each thread and save it in thread local storage. The event loop is wrapped in a trivial class instance that when garbage collected at thread termination will first close the loop:

from threading import local

class Loop():
    def __init__(self):
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)

    def __call__(self):
        return self._loop

    def __del__(self):
        self._loop.close()

my_local = local()

def init_pool():
    # Create an event loop for each thread in the thread pool
    my_local.loop = Loop()

from utils.telegram_utils import telegram_message

    ...
    
    counter = count(1)
    
    def open_websites_with_progress(url):
        ...
    
        if progress in milestones:
            percentage = (progress / total_urls) * 100
    
            message = str(f"{percentage:.0f}% completato ({progress}/{total_urls})")
    
            #logger.info(message)  #rda
            if __name__ == '__main__':
                print("PROVA")
                loop = my_local.loop()
                loop.run_until_complete(telegram_message(message))
                
        ...
    
    with ThreadPoolExecutor(max_workers=3, initializer=init_pool) as executor_2:
        # Iterate the iterator to ensure all submitted tasks have completed
        results = list(executor_2.map(open_websites_with_progress, combined_total_urls))

If this does not resolve the issue, then next try removing the __del__ method from class Loop.

Upvotes: 0

Related Questions