Reputation: 311
I have a basic question that ragards the Python multiprocessing method, how different processes, which use queues to transfer data, could optimally be started.
For that I use a simple example where
All of the upper steps should happen in parallel through three different processes.
Here the example code:
import multiprocessing
import keyboard
import time
def getData(queue_raw):
for num in range(1000):
queue_raw.put(num)
print("getData: put "+ str(num)+" in queue_raw")
while True:
if keyboard.read_key() == "s":
break
def calcFeatures(queue_raw, queue_features):
while not queue_raw.empty():
data = queue_raw.get()
queue_features.put(data**2)
print("calcFeatures: put "+ str(data**2)+" in queue_features")
def sendFeatures(queue_features):
while not queue_features.empty():
feature = queue_features.get()
print("sendFeatures: put "+ str(feature)+" out")
if __name__ == "__main__":
queue_raw = multiprocessing.Queue()
queue_features = multiprocessing.Queue()
processes = [
multiprocessing.Process(target=getData, args=(queue_raw,)),
multiprocessing.Process(target=calcFeatures, args=(queue_raw, queue_features,)),
multiprocessing.Process(target=sendFeatures, args=(queue_features,))
]
processes[0].start()
time.sleep(0.1)
processes[1].start()
time.sleep(0.1)
processes[2].start()
#for p in processes:
# p.start()
for p in processes:
p.join()
This program works, but my question is regarding the start of the different processes.
Ideally process[1]
should start only if process[0]
put data in the queue_raw
; while process[2]
should only start if process[1]
put the calculated features in queue_features
.
Right now I did that through time.sleep()
function, which is suboptimal, since I don't necessarily know how long the processes will take.
I also tried something like:
processes[0].start()
while queue_raw.empty():
time.sleep(0.5)
processes[1].start()
But it won't work, since only the first process is estimated. Any method how this process depending starts could be done?
Upvotes: 0
Views: 857
Reputation: 311
@moooeeeep pointed out the right comment.
Checking with while not queue.empty():
is not waiting till data is actually in the queue!
An approach via a sentinel object (here None
) and a while True
loop will enforce that the process waits till the other processes put data in the queue:
FLAG_STOP=False
while FLAG_STOP is False:
data = queue_raw.get() # get will wait
if data is None:
# Finish analysis
FLAG_STOP = True
else:
# work with data
Upvotes: 1