Hdp 09
Hdp 09

Reputation: 1

cancelling asyncio.wait in combination with set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

I'm trying to make a code template which sends a heartbeat to some external things by ZMQ on win32 and linux.

On win32 I must use the asyncio.WindowsSelectorEventLoopPolicy() because of zmq and zmq.asyncio, however this seems to break the ctrl-c task cancelling behavior if there's an asyncio.await in my code. (The tasks dict is a global one because I want to access it from some tasks too).

If I leave out the asyncio.WindowsSelectorEventLoopPolicy() then the ctrl-c behavior in association with asyncio.wait works. So the culprit is definitely this loop selector.

How can I change my code so I have the best of both worlds. I.e. and using the selectoreventloop (or a loop which is compatible with zmq) and have a correct ctrl-c behavior ?

Below is my complete test code (python 3.10.9) :

# TEST CODE
import asyncio
import nest_asyncio
import zmq, zmq.asyncio
import sys
from colorama import Fore, Back, Style, init
import pprint
import signal

def signal_handler(signal, frame):
    print(f"Signal handler called")
    raise KeyboardInterrupt

async def heartbeat(pub, mqtt_Q, mySQL_Q, influx_Q, exit_event):
    global counter
    try:
        while not exit_event.is_set():
            counter += 1
            print (f"Heartbeat {counter:3} send")
            # // Logic to send heartbeats here

            for _ in range(10):
                await asyncio.sleep(1)

    except asyncio.CancelledError:
        print (f"Cleanup here")

async def main():
    try:
        task_functions = {
            "Heartbeat": heartbeat(zmq_publish, mqtt_Q, mySQL_Q, influx_Q, exit_event),
        }

        for taskname, coro in task_functions.items():
            tasks[taskname] = asyncio.create_task(coro)

        print(f"\n<DEBUG LINE BELOW>\n")
        pprint.pp(tasks)
        print(f"\n")

        await asyncio.gather(*tasks.values())

    except KeyboardInterrupt:
        print(f"Keyboard interrupt !!")

    finally:
        # Clean up tasks on interrupt
        for task in asyncio.all_tasks():
            task.cancel()

        # Ensure all tasks are cancelled and cleaned up
        await asyncio.gather(*tasks.values(), return_exceptions=True)

# <==== SETUP BELOW ====>

init(True)

myIdCode = 'dmp-001'

zmq_publish = 'placeholder'
counter = 0

mqtt_Q = asyncio.Queue()
mySQL_Q = asyncio.Queue()
influx_Q = asyncio.Queue()

exit_event = asyncio.Event()

tasks = {}

if sys.platform != "win32":
    import uvloop
    signal.signal(signal.SIGTERM, signal_handler)
else:
    import winloop
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

asyncio.run(main())`

At first I used

await asyncio.sleep(10) but i changed it to

for _ in range(10):
     await asyncio.sleep(1)

to have a better responsiveness, however, this is a solution which reduces the timeout to 1sec but isn't a real fix.

Upvotes: 0

Views: 17

Answers (0)

Related Questions