Reputation: 1
I am trying to create a mouse hook in Python to process mouse events. My goal is to implement a mechanism where mouse events are processed asynchronously with a timeout, allowing the application to pass the event to the next hook if processing takes too long.
Currently, the mouse hook works initially, but after moving the mouse around for a few seconds, the app stops processing messages. As a result, mouse events are blocked until I terminate the Python app. I suspect the issue is related to how I'm managing the message loop or handling events in my asyncio callback. Also, when the hook stops processing messages, there is no error log indicating that some issue has occured.
This is the code:
import asyncio
import ctypes
import logging
import multiprocessing
from ctypes import wintypes
from dataclasses import dataclass
from colorama import Fore, Style
logger = logging.getLogger(__name__)
@dataclass
class MouseEvent:
x: int
y: int
dx: int = 0
dy: int = 0
# Windows-specific constants
WH_MOUSE_LL = 14
WM_MOUSEMOVE = 0x0200
PM_REMOVE = 0x0001
CallNextHookEx = ctypes.windll.user32.CallNextHookEx
user32 = ctypes.windll.user32
class MouseHook:
def __init__(self, pipe: multiprocessing.Process):
self.pipe = pipe
self.hook_id = None
self._prev_x = None
self._prev_y = None
def _update_position(self, x, y) -> MouseEvent:
"""Calculate dx, dy and create a MouseEvent object."""
dx, dy = 0, 0
prev_x = self._prev_x if self._prev_x is not None else x
prev_y = self._prev_y if self._prev_y is not None else y
logger.debug(
f"{Fore.YELLOW}Mouse moved to: ({x}, {y}), dx: {x - prev_x}, dy: {y - prev_y}{Style.RESET_ALL}"
)
dx = x - prev_x
dy = y - prev_y
# Update previous coordinates
self._prev_x, self._prev_y = x, y
# Return the MouseEvent object
return MouseEvent(x=x, y=y, dx=dx, dy=dy)
def _low_level_mouse_proc(self, nCode, wParam, lParam):
# Block mouse movement if disable_mouse is set
if nCode >= 0:
# Send the event to the main process
self.pipe.send((nCode, wParam, lParam))
response = self.pipe.recv()
if response == "continue":
return CallNextHookEx(
ctypes.c_void_p(None),
ctypes.c_int(nCode),
wintypes.WPARAM(wParam),
wintypes.LPARAM(lParam),
)
elif response == "block":
return 1 # Block the event
else:
print(f"Unexpected response: {response}")
# Timeout occurred, let the OS process the event
return CallNextHookEx(
ctypes.c_void_p(None),
ctypes.c_int(nCode),
wintypes.WPARAM(wParam),
wintypes.LPARAM(lParam),
)
def run_message_loop(self):
"""Run the message loop asynchronously to avoid blocking."""
msg = wintypes.MSG()
self.is_running = True
while self.is_running:
if user32.PeekMessageA(ctypes.byref(msg), None, 0, 0, PM_REMOVE):
user32.TranslateMessage(ctypes.byref(msg))
user32.DispatchMessageW(ctypes.byref(msg))
def register_hook(self):
"""Register the mouse hook."""
LowLevelMouseProc = ctypes.WINFUNCTYPE(
ctypes.c_int, ctypes.c_int, ctypes.c_int, ctypes.c_void_p
)
self.mouse_proc = LowLevelMouseProc(self._low_level_mouse_proc)
self.hook_id = user32.SetWindowsHookExA(WH_MOUSE_LL, self.mouse_proc, None, 0)
try:
# Start the message loop
self.run_message_loop()
finally:
self.stop_hook()
def stop_hook(self):
"""Stop the mouse hook."""
print('Stopping mouse hook...')
if self.hook_id:
user32.UnhookWindowsHookEx(self.hook_id)
self.hook_id = None
self.is_running = False
async def callback(event):
print(f"Event received: {event}")
return False
# Main application logic
async def main():
# Create a pipe for IPC
parent_conn, child_conn = multiprocessing.Pipe()
mouse_hook = MouseHook(child_conn)
# Start the hook manager process
hook_process = multiprocessing.Process(target=mouse_hook.register_hook)
hook_process.start()
try:
while True:
# Listen for events from the hook manager
if parent_conn.poll():
event = parent_conn.recv()
try:
# Run the async callback with a timeout
result = await asyncio.wait_for(callback(event), timeout=0.01)
if result:
parent_conn.send("block")
else:
parent_conn.send("continue")
except asyncio.TimeoutError:
print("Callback exceeded time limit; continuing without waiting.")
finally:
# Send "continue" to allow the OS to process the event
parent_conn.send("continue")
except KeyboardInterrupt:
pass
finally:
# Clean up
hook_process.terminate()
hook_process.join()
if __name__ == "__main__":
asyncio.run(main())
Here’s what I’ve done so far:
Despite these efforts, the hook appears to block after continuous mouse movement.
Upvotes: 0
Views: 21