Jonathan Kittell
Jonathan Kittell

Reputation: 7493

TypeError: can't pickle _thread.lock objects

Trying to run two different functions at the same time with shared queue and get an error...how can I run two functions at the same time with a shared queue? This is Python version 3.6 on Windows 7.

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")


class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()

The logs indicate an error, I tried running console as administrator and I get the same message...

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

Upvotes: 115

Views: 336926

Answers (7)

Lrx
Lrx

Reputation: 397

I'm adding a contribution here :

I had a dictionary containing some clients that would perform http requests. (I don't say it's good practice, I'm just describing what happened)

I was using some external code.

at some point the following was happening :

import copy

d: dict[str,WebService]
...
d = copy.deepcopy(d)

Solved it by removing this line.

So yeah, copy need to serialize objects and the services I was using weren't suited for this. I hope it can help

Upvotes: 1

Navjot Kashi
Navjot Kashi

Reputation: 141

This issue may be on non-linux OS i.e. Windows & MacOS so I tried below code and it fixed the issue for me.

import platform
if platform.system() != "Linux":
    from multiprocessing import set_start_method
    set_start_method("fork")

Add above code in websockets.py file

optionchain_stream from Zerodha kite trade

P.S: encountered this error in optionchain_stream while trying to fetch the option chain from Zerodha kite.trade

Upvotes: 2

Akaisteph7
Akaisteph7

Reputation: 6506

As this is the first answer that shows up when searching for this issue, I will also add my solutions here.

This issue can be caused by many things. Here are two scenarios I have encountered:

  1. Package incompatibility
    • Problem: Trying to use multiprocessing when another part of your code that eventually gets called also needs to create new processes or is incompatible with being copied to a new process because of the use of locks, for example.
    • Solution: My issue was with trying to use a single connection to a MongoDB instance for all of my processes. Creating a new connection for each process resolved the issue.
  2. Class instance
    • Problem: Trying to call pool.starmap from inside of a class to another function in the class. Making it a staticmethod or having a function on the outside call it didn't work and gave the same error. A class instance just can't be pickled so we need to create the instance after we start the multiprocessing.
    • Solution: What I ended up doing that worked for me was to separate my class into two classes. Basically, the function you are calling the multiprocessing on needs to be called right after you instantiate a new object for the class it belongs to. Something like this:
from multiprocessing import Pool

class B:
    ...
    def process_feature(idx, feature):
        # do stuff in the new process
        pass
    ...

def multiprocess_feature(process_args):
    b_instance = B()
    return b_instance.process_feature(*process_args)

class A:
    ...
    def process_stuff():
        ...
        with Pool(processes=num_processes, maxtasksperchild=10) as pool:
            results = pool.starmap(
                multiprocess_feature,
                [
                    (idx, feature)
                    for idx, feature in enumerate(features)
                ],
                chunksize=100,
            )
        ...
    ...

...

Upvotes: 3

user8675687
user8675687

Reputation: 401

You need to change from queue import Queue to from multiprocessing import Queue.

The root reason is the former Queue is designed for threading module Queue while the latter is for multiprocessing.Process module.

Upvotes: 39

Roque
Roque

Reputation: 391

Complementing Marina answer here something to access the whole class. It also fools Pool.map as I needed today.

fakeSelf = None

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + fakeSelf.num_to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        globals()['fakeSelf'] = self
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

Upvotes: 1

Marina
Marina

Reputation: 693

I had the same problem with Pool() in Python 3.6.3.

Error received: TypeError: can't pickle _thread.RLock objects

Let's say we want to add some number num_to_add to each element of some list num_list in parallel. The code is schematically like this:

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1 

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list)) 
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

    def run_parallel(self, num, shared_new_num_list):
        new_num = num + self.num_to_add # uses class parameter
        shared_new_num_list.append(new_num)

The problem here is that self in function run_parallel() can't be pickled as it is a class instance. Moving this parallelized function run_parallel() out of the class helped. But it's not the best solution as this function probably needs to use class parameters like self.num_to_add and then you have to pass it as an argument.

Solution:

def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument
    new_num = num + to_add
    shared_new_num_list.append(new_num)

class DataGenerator:
    def __init__(self, num_list, num_to_add)
        self.num_list = num_list # e.g. [4,2,5,7]
        self.num_to_add = num_to_add # e.g. 1

        self.run()

    def run(self):
        new_num_list = Manager().list()

        pool = Pool(processes=50)
        results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument
                      for num in num_list]
        roots = [r.get() for r in results]
        pool.close()
        pool.terminate()
        pool.join()

Other suggestions above didn't help me.

Upvotes: 53

PvdL
PvdL

Reputation: 1593

multiprocessing.Pool - PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

Move the queue to self instead of as an argument to your functions package and send

Upvotes: 11

Related Questions