Merk
Merk

Reputation: 311

Python multiprocessing: How to start processes that depend on each other?

I have a basic question that ragards the Python multiprocessing method, how different processes, which use queues to transfer data, could optimally be started.

For that I use a simple example where

  1. Data is received
  2. Data is processed
  3. Data is send

All of the upper steps should happen in parallel through three different processes.

Here the example code:

import multiprocessing
import keyboard
import time

def getData(queue_raw):
    for num in range(1000):
        queue_raw.put(num)
        print("getData: put "+ str(num)+" in queue_raw")
    while True:
        if keyboard.read_key() == "s":
            break

def calcFeatures(queue_raw, queue_features):
    while not queue_raw.empty():
        data = queue_raw.get()
        queue_features.put(data**2)
        print("calcFeatures: put "+ str(data**2)+" in queue_features")

def sendFeatures(queue_features):
    while not queue_features.empty():
        feature = queue_features.get()
        print("sendFeatures: put "+ str(feature)+" out")

if __name__ == "__main__":

    queue_raw = multiprocessing.Queue()
    queue_features = multiprocessing.Queue()

    processes = [

        multiprocessing.Process(target=getData, args=(queue_raw,)),
        multiprocessing.Process(target=calcFeatures, args=(queue_raw, queue_features,)),
        multiprocessing.Process(target=sendFeatures, args=(queue_features,))
    ]

    processes[0].start()
    time.sleep(0.1)
    processes[1].start()
    time.sleep(0.1)
    processes[2].start()

    #for p in processes:
    #    p.start()
    for p in processes:
        p.join()

This program works, but my question is regarding the start of the different processes. Ideally process[1] should start only if process[0] put data in the queue_raw; while process[2] should only start if process[1] put the calculated features in queue_features.

Right now I did that through time.sleep() function, which is suboptimal, since I don't necessarily know how long the processes will take. I also tried something like:

processes[0].start()
while queue_raw.empty():
    time.sleep(0.5)
processes[1].start()

But it won't work, since only the first process is estimated. Any method how this process depending starts could be done?

Upvotes: 0

Views: 857

Answers (1)

Merk
Merk

Reputation: 311

@moooeeeep pointed out the right comment. Checking with while not queue.empty(): is not waiting till data is actually in the queue!

An approach via a sentinel object (here None) and a while True loop will enforce that the process waits till the other processes put data in the queue:

FLAG_STOP=False
while FLAG_STOP is False:
    data = queue_raw.get()  # get will wait
    if data is None:
        # Finish analysis
        FLAG_STOP = True
    else:
        # work with data

Upvotes: 1

Related Questions