Vishak Raj
Vishak Raj

Reputation: 13

what happens when Queue send the value after the Queue is closed/joined in multiprocessing python

I am doing a multiprocessing task, there I need to send a value between the processors. For that I used the queue.

def fun(queue_value):
    sleep_value = random.choise(range(1, 20))
    time.sleep()
    queue_value.put(sleep_value, 3)

value = None
queue_value = multiprocessing.Queue()
process = multiprocessing.Process(target=fun, args=(queue_value))
process.daemon = True

process.start()
print('PID', process.pid) #pid shows as 93848

time.sleep(10) # sleeps for 10 sec and then check for the value and the queue will be closed

if not queue_value.empty():
    print(queue_value.get(timeout=3))

queue_value.close()
queue_value.join_thread() # As per the python doc, join_thread() should used only after calling close()
process.join()

if process.is_alive():
    print('process is still alive')
    process.terminate()

print('Is process alive', process.is_alive(), process.exitcode)

Some times, when the sleep_value in fun have value more than 10. lets say sleep_value got 18

At that time, the main process sleeps for 10 sec, and close/join the queue. In this flow, I wont get the value from the queue.

then, queue_value.close() closes the queue-process. At this time, the child process stills in sleep for another 8 seconds

As per the python doc, the queue should be closed or joined before the multiprocessing-process gets joined https://docs.python.org/3.4/library/multiprocessing.html#all-start-methods:~:text=Joining%20processes%20that%20use%20queues

the process is still alive even after calling process.join() function

The if process.is_alive() condition executes and even after process.terminate() is called, the process is still alive

I got the output for the last print statement as Is process alive True None

process.exitcode will return 0 if the process exited successfully and if the process is alive, then it will return None

Question 1: Why the child process is not terminated? Is it because the child process sends/puts the value in the queue, after the queue_value.close() is called?

From this output, I get that the process is not terminated, and before the main program completes its execution I checked the ps -ef in the terminal, there I didnt see this child process-PID which is 93848. So, this means that the process is terminated before I was checking in ps -ef command

Question 2: But, does this child process exited successfully? How to get to know this

To terminate the child process, Is it good to do sync way of multi processing without POOL like in this answer - https://stackoverflow.com/a/25369768/14786386

Sometimes, it creates Zombi process. How to handle that

Thanks

Upvotes: 1

Views: 91

Answers (1)

Booboo
Booboo

Reputation: 44148

First, you have not created your process correctly. The following ...

    process = multiprocessing.Process(target=fun, args=(queue_value))

... should be (you are missing a comma):

    process = multiprocessing.Process(target=fun, args=(queue_value,)) 

Second, you have:

    sleep_value = random.choise(range(1, 20))
    time.sleep()

There is no choise method and you are missing the secs argument to sleep()

Third, you have:

   queue_value.put(sleep_value, 3)

That 3 is being interpreted as the block argument, not the timeout argument. You probably meant:

   queue_value.put(sleep_value, True, 3)

But you could never even reach this statement because of your erroneous previous call choise would raise an exception. Also, there is no reason to expect that this call to put would ever block since this is the one and only put being done to the queue, so I am not sure what the point is of specifying a timeout in this particular example.

Fourth, you should not be testing queue_value.empty() -- this method is unreliable (read the documentation!!!). There could still be something in the process of being put on the queue when this method returns True and you then fall through to issue queue_value.join_thread(). But the thread will never terminate if you haven't first retrieved all items on the queue with queue_value.get().

You should have attempted to do some simple debugging this yourself even if it were just by putting print statements in various locations to see what statements were getting executed. Reading the manual carefully is also a good idea.

This is how I would re-code this:

import multiprocessing
from queue import Empty
import random
import time

def fun(queue_value):
    sleep_value = random.choice(range(1, 20))
    print(f'Sleeping for {sleep_value} seconds.')
    time.sleep(sleep_value)
    queue_value.put(sleep_value)

def main():
    value = None
    queue_value = multiprocessing.Queue()
    process = multiprocessing.Process(target=fun, args=(queue_value,))
    process.daemon = True

    process.start()
    print('PID', process.pid) #pid shows as 93848

    # time.sleep(10) # sleeps for 10 sec and then check for the value and the queue will be closed

    try:
        value = queue_value.get(timeout=3)
    except Empty:
        print('We timed out on get. Doing get with no timeout:')
        value = queue_value.get()
    finally:
        print(f'Got value {value} from the queue.')


    # Now the queue is empty

    queue_value.close()
    queue_value.join_thread() # As per the python doc, join_thread() should used only after calling close()
    process.join()

    # The process cannot possibly be alive now if we reach this point
    """
    if process.is_alive():
        print('process is still alive')
        process.terminate()
    """

    print('Is process alive', process.is_alive(), process.exitcode)

# Required for Windows:
if __name__ == '__main__':
    main()

Prints:

PID 14840
Sleeping for 13 seconds.
We timed out on get. Doing get with no timeout:
Got value 13 from the queue.
Is process alive False 0

You are testing the exit code from the process. That is a good indication of whether the process raised an exception or not. If it raises an exception that is not caught before it has a chance to put the value the main process is expecting on the queue, then the main process will definitely get a timeout exception. In a real situation you would not then do a second call to get with no timeout specified since this will hang forever. Of course, in a real situation you might want to try to catch possible exceptions and put something meaningful of the queue, such as the exception instance.

Upvotes: 0

Related Questions