Mohammad
Mohammad

Reputation: 113

Get error flag/message from a queued process in Python multiprocessing

I am preparing a Python multiprocessing tool where I use Process and Queue commands. The queue is putting another script in a process to run in parallel. As a sanity check, in the queue, I want to check if there is any error happing in my other script and return a flag/message if there was an error (status = os.system() will run the process and status is a flag for error). But I can't output errors from the queue/child in the consumer process to the parent process. Following are the main parts of my code (shortened):

import os
import time
from multiprocessing import Process, Queue, Lock

command_queue = Queue()
lock = Lock()

p = Process(target=producer, args=(command_queue, lock, test_config_list_path))
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock))
    consumers.append(c)

p.daemon = True
p.start()

for c in consumers:
    c.daemon = True
    c.start()

p.join()
for c in consumers:
    c.join()

if error_flag:
    Stop_this_process_and_send_a_message!



def producer(queue, lock, ...):
    for config_path in test_config_list_path:
        queue.put((config_path, process_to_be_queued))



def consumer(queue, lock):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag = 1
    time.sleep(3)

Now I want to get that error_flag and use it in the main code to handle things. But seems I can't output error_flag from the consumer (child) part to the main part of the code. I'd appreciate it if someone can help with this.

Upvotes: 0

Views: 754

Answers (3)

Booboo
Booboo

Reputation: 44283

Given your update, I also pass an multiprocessing.Event instance to your to_do process. This allows you to simply issue a call to wait on the event in the main process, which will block until a call to set is called on it. Naturally, when to_do or one of its threads detects a script error, it would call set on the event after setting error_flag.value to True. This will wake up the main process who can then call method terminate on the process, which will do what you want. On a normal completion of to_do, it still is necessary to call set on the event since the main process is blocking until the event has been set. But in this case the main process will just call join on the process.

Using a multiprocessing.Value instance alone would have required periodically checking its value in a loop, so I think waiting on a multiprocessing.Event is better. I have also made a couple of other updates to your code with comments, so please review them:

import multiprocessing
from ctypes import c_bool
...

def to_do(event, error_flag):
    # Run the tests
    wrapper_threads.main(event, error_flag)
    # on error or normal process completion:
    event.set()

def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        # Call to time.sleep(some_number_of_seconds) should go here, right?
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    while True:
        status = git_pull_change(git_path)
        if status:
            # The repo was just pulled, so no point in doing it again:
            #repo = Repo(git_path)
            #repo.remotes.origin.pull()
            event = multiprocessing.Event()
            error_flag = multiprocessing.Value(c_bool, False, lock=False)
            process = multiprocessing.Process(target=to_do, args=(event, error_flag))
            process.start()
            # wait for an error or normal process completion:
            event.wait()
            if error_flag.value:
                print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
                process.terminate() # Kill the process
            else:
                process.join()
            break

Upvotes: 1

Mohammad
Mohammad

Reputation: 113

So the above code summary is the inner process to run some tests in parallel. I removed the def function part from it, but just assume that is the wrapper_threads in the following code summary. Here I'll add the parent process which is checking a variable (let's assume a commit in my git repo). The following process is meant to run indefinitely and when there is a change it will trigger the multiprocess in the main question:

def to_do():
    # Run the tests
    wrapper_threads.main()


def git_pull_change(path_to_repo):

    repo = Repo(path)
    current = repo.head.commit

    repo.remotes.origin.pull()
    if current == repo.head.commit:
        print("Repo not changed. Sleep mode activated.")
        return False
    else:
        print("Repo changed. Start running the tests!")
        return True

def main():
    process = None
    while True:
        status = git_pull_change(git_path)

    if status:
        repo = Repo(git_path)
        repo.remotes.origin.pull()
        process = multiprocessing.Process(target=to_do)
        process.start()

    if error_flag.value:
        print('Error! breaking the process!!!!!!!!!!!!!!!!!!!!!!!')
        os.system('pkill -U user XXX')
        break

Now I want to propagate that error_flag from the child process to this process and stop process XXX. The problem is that I don't know how to bring that error_flag to this (grand)parent process.

Upvotes: 0

Booboo
Booboo

Reputation: 44283

You should always tag multiprocessing questions with the platform you are running on. Since I do not see your process-creating code within a if __name__ == '__main__': block, I have to assume you are running on a platform that uses OS fork calls to create new processes, such as Linux.

That means your newly created processes inherit the value of error_flag when they are created but for all intents and purposes, if a process modifies this variable, it is modifying a local copy of this variable that exists in an address space that is unique to that process.

You need to create error_flag in shared memory and pass it as an argument to your process:

from multiprocessing import Value
from ctypes import c_bool
...
error_flag = Value(c_bool, False, lock=False)
for i in range(consumer_num):
    c = Process(target=consumer, args=(command_queue, lock, error_flag))
    consumers.append(c)
...

if error_flag.value:
    ...
    #Stop_this_process_and_send_a_message!




def consumer(queue, lock, error_flag):
    while True:
        elem = queue.get()
        if elem is None:
            return
        status = os.system(elem[1])
        if status:
            error_flag.value = True
    time.sleep(3)

But I have a questions/comments for you. You have in your original code the following statement:

if error_flag:
    Stop_this_process_and_send_a_message!

But this statement is located after you have already joined all the started processes. So what processes are there to stop and where are you sending a message to (you have potentially multiple consumers any of which might be setting the error_flag -- by the way, no need to have this done under a lock since setting the value True is an atomic action). And since you are joining all your processes, i.e. waiting for them to complete, I am not sure why you are making them daemon processes. You are also passing a Lock instance to your producer and consumers, but it is not being used at all.

Your consumers return when they get a None record from the queue. So if you have N consumers, the last N elements of test_config_path need to be None.

I also see no need for having the producer process. The main process could just as well write all the records to the queue either before or even after it starts the consumer processes.

The call to time.sleep(3) you have at the end of function consumer is unreachable.

Upvotes: 1

Related Questions