Reputation: 47
I am writing a program that aims at wrapping a C code starting a phasemeter acquisition.
This code is supposed to be run on several process at the same time so I use multiprocessing
.
I read nearly all answered questions on SO on this subject but I think i'm missing something. I would be glad if someone could give me a hand on this one.
I have the following error :
wdir='/home/castaing/Documents/LISA/lisa_zifo_monitoring/Python Drivers/Phasemeter/Py_Code') Traceback (most recent call last):
File "/home/castaing/Documents/LISA/lisa_zifo_monitoring/Python Drivers/Phasemeter/Py_Code/DriverPhasemeter.py", line 297, in Process.map(start,Phasemeter.phase_list)
File "/home/castaing/anaconda3/envs/LISA/lib/python3.7/multiprocessing/pool.py", line 268, in map return self._map_async(func, iterable, mapstar, chunksize).get()
File "/home/castaing/anaconda3/envs/LISA/lib/python3.7/multiprocessing/pool.py", line 657, in get raise self._value
File "/home/castaing/anaconda3/envs/LISA/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks put(task)
File "/home/castaing/anaconda3/envs/LISA/lib/python3.7/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/castaing/anaconda3/envs/LISA/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'CDLL.init.._FuncPtr'
I tried to define a global function to call the phasemeter object's method as you can see below. At the beginning I had a multistart method in the phasemeter object which code was :
def multistart(self) :
with multiprocessing.Pool(len(Phasemeter.phase_list)) as Process :
Process.map(lambda x :x.start,Phasemeter.phase_list)
Here is the code (I only put the parts that seemed relevant to me) :
#%% Initialization
#%% Function definition
#Fix pickle problem ?
def start(Phasemeter_object):
Phasemeter_object.start()
#%% Class definiton
class Phasemeter :
# List of all phasemeters objects, accessed by calling Phasemeter.phase_list
phase_list=[]
#%% Initialization
def __init__(self,*args,**kwargs) :
#%% Robustness. Check type of passed arguments
#%% Path setting, parsing config file
#%% Option handling
#%% Debug, used only if verbose argument is passed in start method
#%% Defining path to shared object file
self.path=os.path.abspath(os.path.join(os.getcwd(),
self.path_to_so_file))
# LIBC is now an object and its method are C Code's functions
self.LIBC = ctypes.CDLL(self.path)
# Settings C library's signature datatype with ctypes data structre tool
# INFO: To see all datas types that can be transmited to C Code
# read ctypes documentation
# First argument is int : argc
# Second argument is string array : argv
# Third is a string : path_to_log_file
self.LIBC.lisaf_phasemeter_main.argtypes= [ctypes.c_int,
ctypes.POINTER(ctypes.c_char_p),
ctypes.c_char_p,]
# Return type is int : Return code
self.LIBC.lisaf_phasemeter_main.restypes = [ctypes.c_int,]
# Add object to phase_list, list used in multistart method
Phasemeter.phase_list.append(self)
#%% Start
def start(self):
#%% Marshalling data for C library
# Create a string array with option list length size
self.c_char_pointer_array = ctypes.c_char_p * len(self.options)
# Encode option list
self.encoded_options = [str.encode(str(i)) for i in self.options ]
# Fill the string array with encoded strings
# REMINDER: C code only understand encoded strings
self.encoded_options = self.c_char_pointer_array (*self.encoded_options)
#%% Calling C library wihth encoded options
# If the logfile option is activated then the encoded
# string is transmited
if self.encoded_path_to_log_file :
self.status = self.LIBC.lisaf_phasemeter_main(
len(self.encoded_options),
self.encoded_options,
self.encoded_path_to_log_file)
# Otherwise None pointer is transmited
else :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,None)
#%% Multistart
if __name__ == '__main__' :
# This function is used to start acquisition on multiple phasemeters
my_phase = Phasemeter(name="PH1")
my_phase = Phasemeter(name="PH2")
with multiprocessing.Pool(len(Phasemeter.phase_list)) as Process :
Process.map(start,Phasemeter.phase_list)
Upvotes: 0
Views: 2621
Reputation: 44128
You solved half of your problem by getting rid of your lambda function. But now you are still in a "pickle." In your Phasemeter.__init__
method you have:
self.LIBC = ctypes.CDLL(self.path)
That code executes in the main process while method Phasemeter.start
is being executed in a different process. Therefore, there is attempt to serialize/deserialize self.LIBC
from one address space to another using pickle
, and this fails. I would suggest postponing setting this attribute until the start
method so that it does not have to be pickled:
import multiprocessing
#%% Initialization
#%% Function definition
#Fix pickle problem ?
def start(Phasemeter_object):
Phasemeter_object.start()
#%% Class definiton
class Phasemeter :
# List of all phasemeters objects, accessed by calling Phasemeter.phase_list
phase_list=[]
#%% Initialization
def __init__(self,*args,**kwargs) :
#%% Robustness. Check type of passed arguments
#%% Path setting, parsing config file
#%% Option handling
#%% Debug, used only if verbose argument is passed in start method
#%% Defining path to shared object file
self.path=os.path.abspath(os.path.join(os.getcwd(),
self.path_to_so_file))
# Add object to phase_list, list used in multistart method
Phasemeter.phase_list.append(self)
#%% Start
def start(self):
# LIBC is now an object and its method are C Code's functions
self.LIBC = ctypes.CDLL(self.path)
# Settings C library's signature datatype with ctypes data structre tool
# INFO: To see all datas types that can be transmited to C Code
# read ctypes documentation
# First argument is int : argc
# Second argument is string array : argv
# Third is a string : path_to_log_file
self.LIBC.lisaf_phasemeter_main.argtypes= [ctypes.c_int,
ctypes.POINTER(ctypes.c_char_p),
ctypes.c_char_p,]
# Return type is int : Return code
self.LIBC.lisaf_phasemeter_main.restypes = [ctypes.c_int,]
#%% Marshalling data for C library
# Create a string array with option list length size
self.c_char_pointer_array = ctypes.c_char_p * len(self.options)
# Encode option list
self.encoded_options = [str.encode(str(i)) for i in self.options ]
# Fill the string array with encoded strings
# REMINDER: C code only understand encoded strings
self.encoded_options = self.c_char_pointer_array (*self.encoded_options)
#%% Calling C library wihth encoded options
# If the logfile option is activated then the encoded
# string is transmited
if self.encoded_path_to_log_file :
self.status = self.LIBC.lisaf_phasemeter_main(
len(self.encoded_options),
self.encoded_options,
self.encoded_path_to_log_file)
# Otherwise None pointer is transmited
else :
self.status = self.LIBC.lisaf_phasemeter_main(len(self.encoded_options),
self.encoded_options,None)
#%% Multistart
if __name__ == '__main__' :
# This function is used to start acquisition on multiple phasemeters
my_phase = Phasemeter(name="PH1")
my_phase = Phasemeter(name="PH2")
with multiprocessing.Pool(len(Phasemeter.phase_list)) as Process :
Process.map(start,Phasemeter.phase_list)
As a bonus you are parallelizing more code.
Upvotes: 2