Kota Mori
Kota Mori

Reputation: 6730

Python mutiprocessing Process does not terminate

When I try to implement a parallel operation in python with multiprocessing library, I saw some processes do not terminate in non-intuitive manner.

My program consists of:

Below is a simplified example. make_data generates random numbers and push to the queue, and the use_data receives the data and compute the average. In total, 2*1000=2000 numbers are generated, and all of them are used. This code runs as expected. After all, all processes becomes terminated and no data is left in the queue.

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
    for i in range(1000):
        x = random.random()
        q.put(x)
    print("final line of make data")

def use_data(q):
    i = 0
    res = 0.0
    while i < 2000:
        if q.empty():
            continue
        i += 1
        x = q.get()
        res = res*(i-1)/i + x/i
    print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

Outcome:

final line of make data
final line of make data
iter   2000, avg = 0.49655
False False False 0

Things change when I let the makers generate more than necessary data. The code below differs from the above only in that each maker generates 5000 data, hence not all data are used. When this is run, it prints message of the final lines, but the maker processes never terminate (needs Ctrl-C to stop).

import random
from multiprocessing import Process, Queue

q = Queue(maxsize=10000)
def make_data(q):
    for i in range(5000):
        x = random.random()
        q.put(x)
    print("final line of make data")

def use_data(q):
    i = 0
    res = 0.0
    while i < 2000:
        if q.empty():
            continue
        i += 1
        x = q.get()
        res = res*(i-1)/i + x/i
    print("iter %6d, avg = %.5f" % (i, res))

u = Process(target=use_data, args=(q,))
u.start()

p1 = Process(target=make_data, args=(q,))
p1.start()
p2 = Process(target=make_data, args=(q,))
p2.start()


u.join(timeout=10)
p1.join(timeout=10)
p2.join(timeout=10)
print(u.is_alive(), p1.is_alive(), p2.is_alive(), q.qsize())

Outcome:

final line of make data
final line of make data
iter   2000, avg = 0.49388
False True True 8000
# and never finish

It looks to me that all processes run to the end, so wonder why they keep alive. Can someone help me understand this phenomenon?

I ran this program on python 3.6.6 from miniconda distribution.

Upvotes: 1

Views: 993

Answers (1)

Kurtis Rader
Kurtis Rader

Reputation: 7459

The child processes putting items into the queue are stuck trying to actually put the object in the queue.

A normal, non-multiprocessing, Queue object is implemented entirely in the address space of a single process. In that case the maxsize is the number of items that can be enqueued before a put() call blocks. But a multiprocessing Queue object is implemented using an IPC mechanism; typically a pipe. And an OS pipe can queue a finite number of bytes (a typical limit is 8KB). So when your use_data() process terminates after dequeuing just 2000 items the make_data() processes block because their IPC channel is full when flushing the locally queued items into the IPC on exit. This means they don't actually exit and thus the attempt to join() those processes blocks indefinitely.

In effect you've created a deadlock. The exact threshold at which that occurs depends on how much data the IPC channel can buffer. For example, on one of my Linux servers your second example works reliably with this inserted between the u.join() and the p1.join():

for _ in range(4000):
    q.get()

Reducing that range slightly (e.g., to 3990) produces intermittent hangs. Reducing the range more (e.g., to 3500) will always hang because at least one of the processes stuffing data into the queue blocks while flushing its items into the IPC channel.

The lesson of this story? Always fully drain a multiprocessing queue before attempting to wait for the processes to terminate.

Upvotes: 2

Related Questions