Reputation: 71
I wrote a wrapper for the Corsair Utility Engine SDK, but there is one function that I have not been able to wrap. It's an async function that accepts a callback function, but I cannot seem to figure out how to give it that callback.
The function looks like this:
bool CorsairSetLedsColorsAsync(int size, CorsairLedColor* ledsColors, void (*CallbackType)(void* context, bool result, CorsairError error), void *context)
These are the implementations that I have tried so far:
def SetLedsColorsAsync(self, size, led_color, callback, context):
c_func = CFUNCTYPE(c_void_p, c_void_p, c_bool, c_int)
c_callback = c_func(callback)
self._libcue.CorsairSetLedsColorsAsync.restype = c_bool
self._libcue.CorsairSetLedsColorsAsync.argtypes = [c_int, POINTER(CorsairLedColor), c_void_p, c_void_p]
return self._libcue.CorsairSetLedsColorsAsync(size, led_color, c_callback, context)
as well as
def SetLedsColorsAsync(self, size, led_color, callback, context):
c_func = CFUNCTYPE(None, c_void_p, c_bool, c_int)
c_callback = c_func(callback)
self._libcue.CorsairSetLedsColorsAsync.restype = c_bool
self._libcue.CorsairSetLedsColorsAsync.argtypes = [c_int, POINTER(CorsairLedColor), c_func, c_void_p]
return self._libcue.CorsairSetLedsColorsAsync(size, led_color, c_callback, context)
The code I'm testing with is
from cue_sdk import *
import time
def test(context, result, error):
print context, result, error
return 0
Corsair = CUE("CUESDK.x64_2013.dll")
Corsair.RequestControl(CAM_ExclusiveLightingControl)
Corsair.SetLedsColorsAsync(1, CorsairLedColor(CLK_H, 255, 255, 255), test, 1)
while True:
time.sleep(1)
The time.sleep()
is there just to keep the program alive.
When running it, it crashes with error code 3221225477
(STATUS_ACCESS_VIOLATION
) on Windows.
If you need to see the actual wrapper, you can find it here.
Upvotes: 2
Views: 1638
Reputation: 71
I completely forgot about the issue with garbage collection until eryksun reminded me. He suggested that I create a permanent callback handler that would store all of the callbacks and call + pop them when necessary. This is what I did.
The function prototype looks like this:
self._callback_type = CFUNCTYPE(None, c_void_p, c_bool, c_int)
self._callback = self._callback_type(self._callback_handler)
self._libcue.CorsairSetLedsColorsAsync.restype = c_bool
self._libcue.CorsairSetLedsColorsAsync.argtypes = [c_int, POINTER(CorsairLedColor), self._callback_type, c_void_p]
The _callback_handler
function looks like this:
def _callback_handler(self, context, result, error):
if context is not None and context in self._callbacks:
self._callbacks.pop(context)(context, result, error)
The actual function looks like this.
def SetLedsColorsAsync(self, size, led_color, callback=None, context=None):
if callback:
if context is None:
context = id(callback)
self._callbacks[context] = callback
return self._libcue.CorsairSetLedsColorsAsync(size, led_color, self._callback, context)
_callback_type
is the actual CFUNCTYPE that wraps the permanent callback (_callback_handler
) which is one of the prototype's argtype. When SetLedsColorsAsync
is called, the callback
parameter is put into a dictionary (with the context or the ID of the function being the key). Instead of supplying the callback into the function, the permanent callback is passed on instead. Once the permanent callback is called, it will call the proper function and remove it from the dictionary.
The test I used:
#!python3
import time
from cue_sdk import *
def test(context, result, error):
print(context, result, error)
assert context == id(test)
Corsair = CUE("CUESDK.x64_2013.dll")
Corsair.RequestControl(CAM_ExclusiveLightingControl)
Corsair.SetLedsColorsAsync(1, CorsairLedColor(CLK_H, 255, 255, 255), test)
while True:
time.sleep(1)
Example output:
2969710418936 True 0
If I'm not making sense, the commit is here.
Upvotes: 1