Pankaj
Pankaj

Reputation: 5250

Python multiprocessing Deadlock using Queue

I have a python program like below.

from multiprocessing import Lock, Process, Queue, current_process
import time

lock = Lock()


def do_job(tasks_to_accomplish, tasks_that_are_done):
    while not tasks_to_accomplish.empty():
        task = tasks_to_accomplish.get()
        print(task)
        lock.acquire()
        tasks_that_are_done.put(task + ' is done by ' + current_process().name)
        lock.release()
        time.sleep(1)
    return True


def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    processes = []

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job, args=(tasks_to_accomplish, tasks_that_are_done))
        processes.append(p)
        p.start()


    # completing process
    for p in processes:
        p.join()

    # print the output
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    return True


if __name__ == '__main__':
    main()

Sometimes program run perfectly but sometimes it gets stuck and doesn't complete. When quit manually, it produces following error.

$ python3 multiprocessing_example.py 
Task no 0
Task no 1
Task no 2
Task no 3
Task no 4
Task no 5
Task no 6
Task no 7
Task no 8
Task no 9
^CProcess Process-1:
Traceback (most recent call last):
  File "multiprocessing_example.py", line 47, in <module>
    main()
  File "multiprocessing_example.py", line 37, in main
    p.join()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 121, in join
    res = self._popen.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 51, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/popen_fork.py", line 29, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "multiprocessing_example.py", line 9, in do_job
    task = tasks_to_accomplish.get()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt

Can someone tell me what is the issue with the program? I am using python 3.6.

Upvotes: 3

Views: 1644

Answers (1)

stovfl
stovfl

Reputation: 15533

Note: Lock is not needed around a Queue.

    lock.acquire()
    tasks_that_are_done.put(task + ' is done by ' + current_process().name)
    lock.release()

Queue
The Queue class in this module implements all the required locking semantics.


Question: ... what is the issue with the program?

You are using Queue.empty() and Queue.get(), such leads to Deadlock on calling join() because there is no guarantee that the empty() State don't change until get() was reaching.

Deadlock prone:

while not tasks_to_accomplish.empty():
    task = tasks_to_accomplish.get()

Instead of using empty/get, Pair use for instance:

import queue
while True:
    try:
        task = tasks_to_accomplish.get_nowait()
    except queue.Empty:
        break
    else:
        # Handle task here
        ...
        tasks_to_accomplish.task_done()

Upvotes: 4

Related Questions