Reputation: 13
I am doing a multiprocessing task, there I need to send a value between the processors. For that I used the queue.
def fun(queue_value):
sleep_value = random.choise(range(1, 20))
time.sleep()
queue_value.put(sleep_value, 3)
value = None
queue_value = multiprocessing.Queue()
process = multiprocessing.Process(target=fun, args=(queue_value))
process.daemon = True
process.start()
print('PID', process.pid) #pid shows as 93848
time.sleep(10) # sleeps for 10 sec and then check for the value and the queue will be closed
if not queue_value.empty():
print(queue_value.get(timeout=3))
queue_value.close()
queue_value.join_thread() # As per the python doc, join_thread() should used only after calling close()
process.join()
if process.is_alive():
print('process is still alive')
process.terminate()
print('Is process alive', process.is_alive(), process.exitcode)
Some times, when the sleep_value
in fun
have value more than 10
. lets say sleep_value
got 18
At that time, the main process sleeps for 10 sec, and close/join the queue. In this flow, I wont get the value from the queue.
then, queue_value.close()
closes the queue-process. At this time, the child process stills in sleep for another 8 seconds
As per the python doc, the queue should be closed or joined before the multiprocessing-process gets joined https://docs.python.org/3.4/library/multiprocessing.html#all-start-methods:~:text=Joining%20processes%20that%20use%20queues
the process is still alive even after calling process.join()
function
The if process.is_alive()
condition executes
and even after process.terminate()
is called, the process is still alive
I got the output for the last print statement as Is process alive True None
process.exitcode
will return 0 if the process exited successfully and
if the process is alive, then it will return None
Question 1: Why the child process is not terminated? Is it because the child process sends/puts the value
in the queue, after the queue_value.close()
is called?
From this output, I get that the process is not terminated, and before the main program completes its execution
I checked the ps -ef
in the terminal, there I didnt see this child process-PID which is 93848
.
So, this means that the process is terminated before I was checking in ps -ef
command
Question 2: But, does this child process exited successfully? How to get to know this
To terminate the child process, Is it good to do sync way of multi processing without POOL like in this answer - https://stackoverflow.com/a/25369768/14786386
Sometimes, it creates Zombi process. How to handle that
Thanks
Upvotes: 1
Views: 91
Reputation: 44148
First, you have not created your process correctly. The following ...
process = multiprocessing.Process(target=fun, args=(queue_value))
... should be (you are missing a comma):
process = multiprocessing.Process(target=fun, args=(queue_value,))
Second, you have:
sleep_value = random.choise(range(1, 20))
time.sleep()
There is no choise
method and you are missing the secs argument to sleep()
Third, you have:
queue_value.put(sleep_value, 3)
That 3
is being interpreted as the block argument, not the timeout argument. You probably meant:
queue_value.put(sleep_value, True, 3)
But you could never even reach this statement because of your erroneous previous call choise
would raise an exception. Also, there is no reason to expect that this call to put
would ever block since this is the one and only put
being done to the queue, so I am not sure what the point is of specifying a timeout in this particular example.
Fourth, you should not be testing queue_value.empty()
-- this method is unreliable (read the documentation!!!). There could still be something in the process of being put on the queue when this method returns True and you then fall through to issue queue_value.join_thread()
. But the thread will never terminate if you haven't first retrieved all items on the queue with queue_value.get()
.
You should have attempted to do some simple debugging this yourself even if it were just by putting print
statements in various locations to see what statements were getting executed. Reading the manual carefully is also a good idea.
This is how I would re-code this:
import multiprocessing
from queue import Empty
import random
import time
def fun(queue_value):
sleep_value = random.choice(range(1, 20))
print(f'Sleeping for {sleep_value} seconds.')
time.sleep(sleep_value)
queue_value.put(sleep_value)
def main():
value = None
queue_value = multiprocessing.Queue()
process = multiprocessing.Process(target=fun, args=(queue_value,))
process.daemon = True
process.start()
print('PID', process.pid) #pid shows as 93848
# time.sleep(10) # sleeps for 10 sec and then check for the value and the queue will be closed
try:
value = queue_value.get(timeout=3)
except Empty:
print('We timed out on get. Doing get with no timeout:')
value = queue_value.get()
finally:
print(f'Got value {value} from the queue.')
# Now the queue is empty
queue_value.close()
queue_value.join_thread() # As per the python doc, join_thread() should used only after calling close()
process.join()
# The process cannot possibly be alive now if we reach this point
"""
if process.is_alive():
print('process is still alive')
process.terminate()
"""
print('Is process alive', process.is_alive(), process.exitcode)
# Required for Windows:
if __name__ == '__main__':
main()
Prints:
PID 14840
Sleeping for 13 seconds.
We timed out on get. Doing get with no timeout:
Got value 13 from the queue.
Is process alive False 0
You are testing the exit code from the process. That is a good indication of whether the process raised an exception or not. If it raises an exception that is not caught before it has a chance to put the value the main process is expecting on the queue, then the main process will definitely get a timeout exception. In a real situation you would not then do a second call to get
with no timeout specified since this will hang forever. Of course, in a real situation you might want to try to catch possible exceptions and put something meaningful of the queue, such as the exception instance.
Upvotes: 0