vojtam
vojtam

Reputation: 1245

Multiprocessing BrokenPipeError

I have a following problem. I am running a parallel task. I am getting this error:

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "eclat_model.py", line 127, in do_work
    function(*args, work_queue, valid_list)
  File "eclat_model.py", line 115, in eclat_parallel_helper
    valid_list.extend(next_vectors)
  File "<string>", line 2, in extend
  File "/usr/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header)
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

Relevant functions in eclat_model.py look like this:

def eclat_parallel_helper(index, bit_vectors, min_support, work_queue, valid_list):
    next_vectors = []
    for j in range(index + 1, len(bit_vectors)):
        item_vector = bit_vectors[index][0] | bit_vectors[j][0]
        transaction_vector = bit_vectors[index][1] & bit_vectors[j][1]
        support = get_vector_support(transaction_vector)
        if support >= min_support:
            next_vectors.append((item_vector, transaction_vector, support))
    if len(next_vectors) > 0:
        valid_list.extend(next_vectors)
        for i in range(len(next_vectors)):
            work_queue.put((eclat_parallel_helper, (i, next_vectors, min_support)))


def do_work(work_queue, valid_list, not_done):
    # work queue entries have the form (function, args)
    while not_done.value:
        try:
            function, args = work_queue.get_nowait()
        except QueueEmptyError:
            continue
        function(*args, work_queue, valid_list)
        work_queue.task_done()
    work_queue.close()

EDIT:

Multiprocessing part of the code is as follows: bit_vectors is a list of lists, where each entry is of the form [items, transactions, support], where items is a bit vector encoding which items appear in the itemset, vector is a bit vector encoding which transactions the itemset appears in, and support is the number of transactions in which the itemset occurs.

from multiprocessing import Process, JoinableQueue, Manager, Value, cpu_count



def eclat_parallel(bit_vectors, min_support):
 
    not_done = Value('i', 1)
    manager = Manager()
    valid_list = manager.list()
    work_queue = JoinableQueue()
    for i in range(len(bit_vectors)):
        work_queue.put((eclat_parallel_helper, (i, bit_vectors, min_support)))

    processes = []
    for i in range(cpu_count()):
        p = Process(target=do_work, args=(work_queue, valid_list, not_done), daemon=True)
        p.start()
        processes.append(p)

    work_queue.join()

    not_done.value = 0

    work_queue.close()

    valid_itemset_vectors = bit_vectors
    for element in valid_list:
        valid_itemset_vectors.append(element)

    for p in processes:
        p.join()

    return valid_itemset_vectors

What does this error mean, please? Am I appending too many elements into next_vectors list?

Upvotes: 0

Views: 1822

Answers (1)

Adri Moya
Adri Moya

Reputation: 21

I had the same issue, in my case just added a delay (time.sleep(0.01)) to solve it. The problem is that the individual processes are too fast on queue that causes the error.

Upvotes: 2

Related Questions