Reputation: 115
first, I worked with if not q.empty()
,
from multiprocessing import Process, Queue, current_process
import datetime, time
def func1(q):
for i in range(5):
if not q.full():
print('Put %s to queue...' % i)
q.put(i)
else:
print(current_process().name, 'is Full', current_process())
def func2(q):
while True:
if not q.empty():
i = q.get(block=True, timeout=1)
print('Get %s from queue.' % i)
else:
print('queue empty')
break
if __name__=='__main__':
q = Queue(10)
ProA = Process(target=func1, name='A', args=(q,))
ProB = Process(target=func2, name='B', args=(q,))
ProA.daemon = True
ProB.daemon = True
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.start()
print('*' * 10, 'Since Now Start Subprocess A')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProA.join()
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProB.start()
print('*' * 10, 'Since Now Start Subprocess B')
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
ProB.join()
print(datetime.datetime.now(), ProA.name, 'is Alive:', ProA.is_alive())
print(datetime.datetime.now(), ProB.name, 'is Alive:', ProB.is_alive())
and the result shows like this:
2022-01-01 16:34:07.211678 A is Alive: False
2022-01-01 16:34:07.211738 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 16:34:07.213302 A is Alive: True
2022-01-01 16:34:07.213384 B is Alive: False
Put 0 to queue...
Put 1 to queue...
Put 2 to queue...
Put 3 to queue...
Put 4 to queue...
2022-01-01 16:34:07.244838 A is Alive: False
2022-01-01 16:34:07.244857 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 16:34:07.245714 A is Alive: False
2022-01-01 16:34:07.245756 B is Alive: True
Get 0 from queue.
Get 1 from queue.
Get 2 from queue.
Get 3 from queue.
Get 4 from queue.
queue empty
2022-01-01 16:34:07.272992 A is Alive: False
2022-01-01 16:34:07.273008 B is Alive: False
Process finished with exit code 0
However, when I changed the condition to if not q.qsize == 0
,
replace the part of func2
with
def func2(q):
while True:
try:
if q.qsize() != 0:
i = q.get(block=True, timeout=1)
print('Get %s from queue.' % i)
except:
print('q.qsize == 0')
break
I got
2022-01-01 16:37:09.681813 A is Alive: False
2022-01-01 16:37:09.681980 B is Alive: False
********** Since Now Start Subprocess A
2022-01-01 16:37:09.684073 A is Alive: True
2022-01-01 16:37:09.684134 B is Alive: False
Put 0 to queue...
Put 1 to queue...
Put 2 to queue...
Put 3 to queue...
Put 4 to queue...
2022-01-01 16:37:09.720998 A is Alive: False
2022-01-01 16:37:09.721029 B is Alive: False
********** Since Now Start Subprocess B
2022-01-01 16:37:09.722099 A is Alive: False
2022-01-01 16:37:09.722140 B is Alive: True
q.qsize == 0
2022-01-01 16:37:09.761179 A is Alive: False
2022-01-01 16:37:09.761202 B is Alive: False
Process finished with exit code 0
I suppose here I should get a few values as output at least ...? however it seems that the code just jumped to the judge condition 'except', which means q.qsize here is equal to 0 ...???
Upvotes: 0
Views: 1618
Reputation: 44323
Here are some examples of how queues can be used. But first, there is something everyone needs to know about the underlying implementation. A multiprocessing.Queue
is implemented using a multiprocessing.Pipe
, which has a limited capacity. That is, a process cannot send data to a Pipe's underlying connection in an unlimited fashion without ultimately blocking unless some other process is reading the data. Yet you can define a queue with an arbitrarily large capacity, even an infinite capacity, so that you can continue to issue put
calls to the queue without ever blocking. How is that accomplished? The queue instance actually starts a new thread that is responsible for doing the underlying writing to the pipe and is is this thread that will block while the thread that called put
can continue to run. But what this means is that the process will not terminate until the thread that was created to write to the pipe terminates and it will not terminate until the pipe is able to accept the data. What does all this mean? If you have a process A that is a consumer of items on a queue put by producer process B, process A must first get all the items from the queue before attempting to join
process B (i.e. wait for process B to complete) because process B may not be able to complete because it may be blocked waiting for process A to get items from the queue.
Example 1
In this example the main process starts two new processes, a producer of items, func1
, and a consumer of those items, func2
, and waits for both processes to complete. One of the simplest ways to handle this situation is for the producer to put a special sentinel item that cannot be confused for a regular item that indicates there are no more items left to process. In other words, the sentinel acts as an end-of-file indicator. So the consumer simply does blocking get calls on the queue until it detects the sentinel item and then it can terminate. In this case None
is used as the sentinel.
from multiprocessing import Process, Queue
def func1(q):
for i in range(10):
print('Put %s to queue...' % i)
q.put(i)
# Put special item signaling there are no more:
q.put(None)
def func2(q):
while True:
i = q.get(block=True)
if i is None:
# There will be no more items on the queue
break
print('Get %s from queue.' % i)
if __name__=='__main__':
q = Queue()
proA = Process(target=func1, name='A', args=(q,))
proB = Process(target=func2, name='B', args=(q,))
proA.start()
proB.start()
proA.join()
proB.join()
Example 2
In this example, we have two queues: a "work queue", work_q
, to which numbers to be squared are put and a "results queue", results_q
, to which the results of squaring the items gotten from the work queue are put. As before func1
puts a sentinel to the work queue to indicate there are no more items to process. func2
does not need to put a sentinel to the results queue since func1
knows how many results there should be on the queue.
from multiprocessing import Process, Queue
def func1(work_q, results_q):
for i in range(10):
work_q.put(i)
# Put special item signaling there are no more:
work_q.put(None)
# There should be 10 results:
for _ in range(10):
print(results_q.get())
def func2(work_q, results_q):
while True:
i = work_q.get(block=True)
if i is None:
break
results_q.put(i ** 2)
if __name__=='__main__':
work_q = Queue()
results_q = Queue()
proA = Process(target=func1, name='A', args=(work_q, results_q))
proB = Process(target=func2, name='B', args=(work_q, results_q))
proA.start()
proB.start()
proA.join()
proB.join()
Example 3
This example is similar to the previous example except func1
does not bother writing a sentinel and so func2
just loops forever getting from the work queue and putting to the results queue. Since func2
never terminates, the process that targets this function must be a daemon process so that it terminates when the main process terminates.
from multiprocessing import Process, Queue
def func1(work_q, results_q):
for i in range(10):
work_q.put(i)
# Put special item signaling there are no more:
# There should be 10 results:
for _ in range(10):
print(results_q.get())
def func2(work_q, results_q):
while True:
i = work_q.get(block=True)
results_q.put(i ** 2)
if __name__=='__main__':
work_q = Queue()
results_q = Queue()
proA = Process(target=func1, name='A', args=(work_q, results_q))
proB = Process(target=func2, name='B', args=(work_q, results_q), daemon=True)
proA.start()
proB.start()
proA.join()
Example 4
In all the prior examples, the main process started producer and consumer processes and then did nothing but wait for the completion of these processes. It seems that we could have done with one fewer process, i.e. the main process itself could be the producer.
from multiprocessing import Process, Queue
def func(work_q, results_q):
while True:
i = work_q.get(block=True)
if i is None:
break
results_q.put(i ** 2)
if __name__=='__main__':
work_q = Queue()
results_q = Queue()
for i in range(10):
work_q.put(i)
# Put special item signaling there are no more:
work_q.put(None)
proc = Process(target=func, name='A', args=(work_q, results_q))
proc.start()
# There should be 10 results:
for _ in range(10):
print(results_q.get())
proc.join()
Refer back to my opening discussion. Note that here the main process is retrieving the 10 results from the producer before joining the producer. If I were to move the statement proc.join()
before the loop that calls q.get()
, it would be a grievous error.
Example 5
Finally, there is an indirect way of "counting" queue messages without getting an explicit count and that is by using a multiprocessing.JoinableQueue
, which supports two additional methods task_done()
and join()
. In part, the documentation states:
task_done()
: Indicate that a formerly enqueued task is complete. Used by queue consumers. For each get()
used to fetch a task, a subsequent call to task_done()
tells the queue that the processing on the task is complete.
join()
: Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done()
to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join()
unblocks.
In the following example, the main process puts 10,000 numbers to be squared on the work queue. This time, however, the consumer process (which is also a producer of results), func2
, puts an indeterminate number of results to the results queue. So the main process cannot simply use a counter to retrieve the results not knowing how many to count. Nor can func2
put a sentinel item to the results queue because it is in an infinite loop and thus has no way of knowing where to insert the sentinel.
Therefore, we use a JoinableQueue
for the work queue. The main process can then issue a call to join
on the work queue and know that when that call completes that func2
has processed all the items on the work queue, i.e. it has put all the results to the results queue. Therefore, the main process can do non-blocking gets in a loop until it gets a queue.Empty
exception.
from multiprocessing import Process, Queue, JoinableQueue
import queue
def func(work_q, results_q):
# This function "returns" an indeterminate number of results:
while True:
i = work_q.get(block=True)
# Just return results for even values of i:
if i % 2 == 0:
results_q.put(i ** 2)
# Show we have processed the item from the work_q:
work_q.task_done()
if __name__=='__main__':
work_q = JoinableQueue()
results_q = Queue()
proc = Process(target=func, name='A', args=(work_q, results_q), daemon=True)
proc.start()
for i in range(10_000):
work_q.put(i)
work_q.join()
# All items have been read from the work_q and whatever results there are
# have been put to the results_q:
cnt = 0
try:
while True:
print(results_q.get(block=False))
cnt += 1
except queue.Empty:
pass
print(cnt, 'items retrieved')
Prints:
0
4
16
36
64
100
...
99760144
99800100
99840064
99880036
99920016
99960004
5000 items retrieved
Example 6
The above example, but this time just with "regular" queues and using sentinels for both the work and results queue. This also allows the main process to do something with the results as they are being produced rather than having to wait for all results to be written to the results queue, as was the case with the previous example.
from multiprocessing import Process, Queue
def func(work_q, results_q):
# This function "returns" an indeterminate number of results:
while True:
i = work_q.get(block=True)
if i is None:
break
# Just return results for even values of i:
if i % 2 == 0:
results_q.put(i ** 2)
results_q.put(None) # all results have been put
if __name__=='__main__':
work_q = Queue()
results_q = Queue()
proc = Process(target=func, name='A', args=(work_q, results_q))
proc.start()
for i in range(10_000):
work_q.put(i)
work_q.put(None) # All items have been put
cnt = 0
while True:
result = results_q.get(block=True)
if result is None:
break
print(result)
cnt += 1
print(cnt, 'items retrieved')
proc.join()
Upvotes: 3