Reputation: 325
I have a function (A
) that creates data at a constant rate, let's say 100 per second. I want to run another function (B
) on the data created by A
as it comes out. Function B
may take longer than 0.01s
to run, but I don't want that to back up the dataflow. Should I create a Pool
of B
and just pass a common Queue
into A
and B
to use (like code below)? I also saw that you're supposed to use Pool
s to process lists of data. Is this how they're supposed to be used (in regards to the method I described)? Should I just use two Process
s and alternate sending data to them?
def A(queue):
while True:
data = data_getter()
queue.put(data)
def B(queue):
while True:
data = queue.get(True):
do_something(data)
# main.py
q = Queue()
pool = Pool(initializer=B, initargs=[q])
A(q)
Upvotes: 1
Views: 470
Reputation: 44283
Here is my short answer:
A process pool's purpose for existence is to allow you to process N "jobs" in a parallel fashion to the fullest degree possible given that you have been allocated M physical processors to this task.
Creating a queue that a Process
instance is writing to N times (which is equivalent to submitting N "jobs") and having M Process
instances reading and processing these meessages, i.e. "jobs", and processing them, is in effect an implementation of a type of process pool. To use a separate process pool just to create the processes needed be the reader processes of the queue seems like an unnecessary layer of complexity. So I would create M Process
instances that read from a common queue that the writer process add messages to.
TL;DR (or the long answer)
As you have rightly surmised you can do it with (1) creating individual Process
instances or (2) by using a process pool. Method 1 seems intuitively the most logical way of doing it, but it is not necessarily the most straightforward code. I present a few methods below using a simulation where the queue writer process create a queue entry once every .01 seconds but the queue reader process requires .06 seconds to process a queue entry so that at least 6 such processes (reading from a common queue) are required to keep up:
Method 1 -- Explicit Processes
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
# signal readers to terminate:
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(queue):
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,)) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers))
writer.start()
# wait for writer to terminate:
writer.join()
for p in readers:
p.join()
print('Done')
if __name__ == '__main__':
main()
Method 2 - Using a Process Pool
import multiprocessing as mp
import time
class Sentinel():
pass
def init_pool(q):
global queue
queue = q
def a(n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
print('queue size is now approximately: ', queue.qsize()) # print queue size
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b():
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers + 1, initializer=init_pool, initargs=(queue,))
readers_results = [pool.apply_async(b) for _ in range(n_readers)]
# now submit writer:
pool.apply(a, args=(n_readers,))
# wait for readers to finish:
for r in readers_results:
r.get()
print('Done')
if __name__ == '__main__':
main()
The only advantage of the second method is that if it becomes necessary for workers a
and/or b
to return values back to the main process, it becomes simple when using process pools.
Note
Implementing your queue reader processes, function B
, by using the initializer
argument to the Pool
constructor is also doable (see Method Pool 2A below), but then function A
must run under the main process. But these Pool processes are daemon processes and will terminate as soon as all the non-daemon processes terminate. This is why I had arranged in Method 2 for the writing of the special sentinel messages to the queue as a signal for the "jobs" (but not the processes running the job) to terminate when the sentinel messages are read. I therefore know that when the job has completed that no more messages are on the queue and that there will never be any more messages on the queue. Similar logic applies to Method 1, except the entire process also terminates and I can use join
to know when that occurs. But in your case using implicitly daemon threads to perform the reading of the queues, even if you add additional code to add the sentinel values to the queue when all the input queue values have been read and the initializer function, B
, terminates, how does the main process know? Again, you can call method Pool.join()
on the pool, which prevents any future work from being submitted to the pool (we never actually explicitly submit work; all the work is being done in the pool initializer functions). And then you follow this up with a call to Pool.join()
, which waits for each worker process to exit. This will occur immediately as soon as the pool initializer function for each process instance completes since the previous call to Pool.close
tells the pool that there will never be any additional work added to the pool.
Method 2A - Using a Process Pool with a Pool Initializaer
import multiprocessing as mp
import time
class Sentinel():
pass
def a(queue, n_readers):
for i in range(1000):
time.sleep(.01)
queue.put(i)
end_of_queue = Sentinel()
for _ in range(n_readers):
queue.put(end_of_queue)
def b(the_queue):
global queue
queue = the_queue
while True:
value = queue.get(True)
# signal to terminate?
if isinstance(value, Sentinel):
break
print(value, flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
pool.join()
print('Done')
if __name__ == '__main__':
main()
Notes
All three methods will work and all three pre-suppose that the reader process does not run indefinitely and so we are interested in an orderly termination (and hence the need for the sentinel values to signal termination to the reader processes). But if the writer process is designed to run indefinitely until the process is interrupted by the user, then for example, Method 2a can be modified to use a keyboard interrupt generated by the user entering ctrl-C, to terminate execution:
Modified Method 2A Terminated Only by Keyboard Interrupt
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
try:
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
except KeyboardInterrupt:
pass
def b(the_queue):
global queue
queue = the_queue
try:
while True:
value = queue.get(True)
print(value, end=' ', flush=True)
time.sleep(.06)
except KeyboardInterrupt:
pass
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
pool = mp.Pool(n_readers, initializer=b, initargs=(queue,))
a(queue, n_readers)
# wait for readers to finish:
pool.close() # must be called before pool.join()
try:
pool.join()
except KeyboardInterrupt:
pool.terminate()
print('Done')
if __name__ == '__main__':
main()
Modified Method 1 Terminated Only by Keyboard Input
import multiprocessing as mp
import time
import itertools
def a(queue, n_readers):
for i in itertools.count(0):
time.sleep(.01)
queue.put(i)
def b(queue):
while True:
value = queue.get(True)
if value % 100 == 0:
print(value, end=' ', flush=True)
time.sleep(.06)
def main():
n_readers = mp.cpu_count() - 1
queue = mp.Queue()
# create queue readers:
readers = [mp.Process(target=b, args=(queue,), daemon=True) for _ in range(n_readers)]
for p in readers:
p.start()
# now start queue writer:
writer = mp.Process(target=a, args=(queue, n_readers), daemon=True)
writer.start()
input('Enter return to terminate...')
print()
print('Done')
if __name__ == '__main__':
main()
Conclusion
You obviously have choices. If the program is not to run indefinitely and you wish an orderly shutdown being sure that all messages that have been enqueued have been processed, my preference would be Method 1. Methods 2 and 2a just seem to be lazy ways of getting N processes doing the same identical jobs with identical arguments for you.
On the other hand, if your writer process task runs endlessly and you need to terminate it and don't mind that there may be one or two unprocessed messages left on the queue (after all you are terminating the program at a rather arbitrarily point in time, so that shouldn't be a big deal), then if a simple input
statement suffices to input the command to terminate, Modified Method 1 seems the way to go requiring the fewest modifications. But if the running program is constantly outputting messages, the text displayed by the input
statement would get lost and you need to rely on using a keyboard interrupt handler for every process, which is more involved. You can use this technique if any of the modified examples; I have used it in Modified Method 2a as an example, since that code did not lend itself to using the input
statement technique because there were just too much terminal output. Undoubtedly, when there is any terminal output, the surest method is using keyboard handler interrupt handlers method. I would still favor using Method 1 and its variations instead of a process pool as long as there was no need to get return values back from any of the processes:
Upvotes: 1