Joachim
Joachim

Reputation: 499

Python: multiprocessing Queue.put() in module won't send anything to parent process

I am trying to make 2 processes communicate between each other using the multiprocessing package in Python, and more precisely the Queue() class. From the parent process, I want to get an updated value of the child process each 5 seconds. This child process is a class function. I have done a toy example where everything works fine.

However, when I try to implement this solution in my project, it seems that the Queue.put() method of the child process in the sub-module won't send anything to the parent process, because the parent process won't print the desired value and the code never stops running. Actually, the parent process only prints the value sent to the child process, which is True here, but as I said, never stops.

So my questions are:

  1. Is there any error in my toy-example ?

  2. How should I modify my project in order to get it working just like my toy example ?

Toy example: works

main module

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time 
import test_mod as test

def loop(output):
    stop_event = Event()
    q = Queue()
    child_process = Process(target=test.child.sub, args=(q,))
    child_process.start()
    i = 0
    print("started at {} ".format(time.time()))

    while not stop_event.is_set():
        i+=1
        time.sleep(5)
        q.put(True)
        print(q.get())
        if i == 5:
            child_process.terminate()
            stop_event.set()

    output.put("main process looped")

if __name__ == '__main__':
    stop_event, output = Event(), Queue()
    k = 0
    while k < 5:
        loop_process = Process(target=loop, args=(output,))
        loop_process.start()
        print(output.get())
        loop_process.join()
        k+=1

submodule

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time


class child(object):
    def __init__(self):
        pass

    def sub(q):
        i = 0
        while i < 2000:
            latest_value = time.time()
            accord = q.get()
            if accord == True:
                q.put(latest_value)
            accord = False
            time.sleep(0.0000000005)
            i+=1

Project code: doesn't work

main module

import neat #package in which the submodule is 
import *some other stuff*

def run(config_file):

    config = neat.Config(some configuration)

    p = neat.Population(config)

    **WHERE MY PROBLEM IS**

    stop_event = Event()
    q = Queue()
    pe = neat.ParallelEvaluator(**args)

    child_process = Process(target=p.run, args=(pe.evaluate, q, other args))
    child_process.start()

    i = 0
    while not stop_event.is_set():

        q.put(True)
        print(q.get())
        time.sleep(5)
        i += 1
        if i == 5:
            child_process.terminate()
            stop_event.set()

if __name__ == '__main__':
    run(config_file)

submodule

class Population(object):
    def __init__():
      *initialization*

    def run(self, q, other args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()
            if accord == True:
                add_2.put(self.best_genome.fitness)
            accord = False

        return self.best_genome

NB:

  1. I am not used to multiprocessing

  2. I have tried to give the most relevant parts of my project, given that the entire code would be far too long.

  3. I have also considered using Pipe(), however this option didn't work either.

Upvotes: 0

Views: 1184

Answers (1)

RaJa
RaJa

Reputation: 1567

If I see it correctly, your desired submodule is the class Population. However, you start your process with a parameter of the type ParallelEvaluator. Next, I can't see that you supply your Queue q to the sub-Process. That's what I see from the code provided:

stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args)
child_process.start()

Moreover, the following lines create a race condition:

q.put(True)
print(q.get())

The get command is like a pop. So it takes an element and deletes it from the queue. If your sub-process doesn't access the queue between these two lines (because it is busy), the True will never make it to the child-process. Hence, it is better two use multiple queues. One for each direction. Something like:

stop_event = Event()
q_in = Queue()
q_out = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args))
child_process.start()

i = 0
while not stop_event.is_set():

     q_in.put(True)
     print(q_out.get())
     time.sleep(5)
     i += 1
     if i == 5:
         child_process.terminate()
         stop_event.set()

This is your submodule

class Population(object):
    def __init__():
      *initialization*

    def run(self, **args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()           # add_2 = q_in
            if accord == True:
                add_3.put(self.best_genome.fitness)  #add_3 = q_out
            accord = False

        return self.best_genome

Upvotes: 1

Related Questions