Reputation: 1
I'm trying to make a code template which sends a heartbeat to some external things by ZMQ on win32 and linux.
On win32 I must use the asyncio.WindowsSelectorEventLoopPolicy() because of zmq and zmq.asyncio, however this seems to break the ctrl-c task cancelling behavior if there's an asyncio.await in my code. (The tasks dict is a global one because I want to access it from some tasks too).
If I leave out the asyncio.WindowsSelectorEventLoopPolicy() then the ctrl-c behavior in association with asyncio.wait works. So the culprit is definitely this loop selector.
How can I change my code so I have the best of both worlds. I.e. and using the selectoreventloop (or a loop which is compatible with zmq) and have a correct ctrl-c behavior ?
Below is my complete test code (python 3.10.9) :
# TEST CODE
import asyncio
import nest_asyncio
import zmq, zmq.asyncio
import sys
from colorama import Fore, Back, Style, init
import pprint
import signal
def signal_handler(signal, frame):
print(f"Signal handler called")
raise KeyboardInterrupt
async def heartbeat(pub, mqtt_Q, mySQL_Q, influx_Q, exit_event):
global counter
try:
while not exit_event.is_set():
counter += 1
print (f"Heartbeat {counter:3} send")
# // Logic to send heartbeats here
for _ in range(10):
await asyncio.sleep(1)
except asyncio.CancelledError:
print (f"Cleanup here")
async def main():
try:
task_functions = {
"Heartbeat": heartbeat(zmq_publish, mqtt_Q, mySQL_Q, influx_Q, exit_event),
}
for taskname, coro in task_functions.items():
tasks[taskname] = asyncio.create_task(coro)
print(f"\n<DEBUG LINE BELOW>\n")
pprint.pp(tasks)
print(f"\n")
await asyncio.gather(*tasks.values())
except KeyboardInterrupt:
print(f"Keyboard interrupt !!")
finally:
# Clean up tasks on interrupt
for task in asyncio.all_tasks():
task.cancel()
# Ensure all tasks are cancelled and cleaned up
await asyncio.gather(*tasks.values(), return_exceptions=True)
# <==== SETUP BELOW ====>
init(True)
myIdCode = 'dmp-001'
zmq_publish = 'placeholder'
counter = 0
mqtt_Q = asyncio.Queue()
mySQL_Q = asyncio.Queue()
influx_Q = asyncio.Queue()
exit_event = asyncio.Event()
tasks = {}
if sys.platform != "win32":
import uvloop
signal.signal(signal.SIGTERM, signal_handler)
else:
import winloop
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())`
At first I used
await asyncio.sleep(10) but i changed it to
for _ in range(10):
await asyncio.sleep(1)
to have a better responsiveness, however, this is a solution which reduces the timeout to 1sec but isn't a real fix.
Upvotes: 0
Views: 17