thepero
thepero

Reputation: 13

Subprocess in Python fails to terminate when Queue is utilized

First of all, I am very new to multiprocessing. I am trying to implement a simple camera simulator that would generate images in a subprocess and put them in a queue for another subprocess to process them. An initialized camera simulator subprocess (camera) hangs/freezes, when I try to invoke the camera.join() method to finalize the program.

import multiprocessing
import queue
import time
import numpy as np

class CameraSimulator(multiprocessing.Process):
    def __init__(self, queue):
        super().__init__()
        self.queue = queue
        self.connected = multiprocessing.Event()
        self.acquiring = multiprocessing.Event()

    def run(self):
        while self.connected.is_set():
            while self.acquiring.is_set():
                image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
                try:
                    self.queue.put(image)
                except queue.Full:
                    continue
                time.sleep(0.1)

    def connect(self):
        self.connected.set()
        self.start()

    def disconnect(self):
        self.connected.clear()

    def acquire(self):
        self.acquiring.set()

    def stop(self):
        self.acquiring.clear()

if __name__ == "__main__":

    image_queue = multiprocessing.Queue(maxsize=1000)
    camera = CameraSimulator(image_queue)

    print("Connect camera")
    camera.connect()

    print("Start camera acquisition")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    time.sleep(2)

    print("Start camera acquisition again")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    print("Disconnect camera")
    camera.disconnect()

    print("Draining queue")
    while not image_queue.empty():
        try:
            image_queue.get_nowait()
        except queue.Empty:
            break
    print("Queue drained")

    camera.join()
    print("Camera process terminated")

It seems that the problem might be that the queue is not empty. That's why I deliberately tried to drain the queue, but the problem still persists. Could it be that calling empty() is not reliable way to check, if a queue is empty due to multithreading/multiprocessing semantics? I could use camera.terminate() to forcefully terminate the subprocess, but that I would assume is not a good practice. Any help would be greatly appreciated!

Upvotes: 1

Views: 66

Answers (1)

Ahmed AEK
Ahmed AEK

Reputation: 17840

Python blocks the child process untill the queue is drained, but the queue is "double buffered" (so put is non-blocking), and you are only draining the current queue buffer not the "double buffer", it need to be continuously drained until the child terminates.

You can just have a thread drain the queue while you are joining the worker process.

import queue
import time
import numpy as np
import multiprocessing
import threading

class CameraSimulator(multiprocessing.Process):
    def __init__(self, queue):
        # if an exception happen, don't leave a zombie process
        super().__init__(daemon=True) 
        self.queue = queue
        self.connected = multiprocessing.Event()
        self.acquiring = multiprocessing.Event()


    def run(self):
        while self.connected.is_set():
            while self.acquiring.is_set():
                image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
                try:
                    self.queue.put(image)
                except queue.Full:
                    continue # <---- BUG! skips the sleep
                time.sleep(0.1)

    def connect(self):
        self.connected.set()
        self.start()

    def disconnect(self):
        self.connected.clear()

    def acquire(self):
        self.acquiring.set()

    def stop(self):
        self.acquiring.clear()

if __name__ == "__main__":

    image_queue = multiprocessing.Queue(maxsize=1000)
    camera = CameraSimulator(image_queue)

    print("Connect camera")
    camera.connect()

    print("Start camera acquisition")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    time.sleep(2)

    print("Start camera acquisition again")
    camera.acquire()
    time.sleep(2)
    print("Stop camera acquisition")
    camera.stop()

    print("Disconnect camera")
    camera.disconnect()

    print("Draining queue")
    items = []
    drainer_stop = threading.Event()
    def drainer():
        while not drainer_stop.is_set():
            try:
                items.append(image_queue.get(timeout=0.001))
            except queue.Empty:
                pass


    drainer_thread = threading.Thread(target=drainer, args=[], daemon=True)
    drainer_thread.start()

    print("joining process")
    camera.join()
    print("Camera process terminated")

    drainer_stop.set()
    drainer_thread.join()

    print("Queue drained")
    print(f"got {len(items)} items!")
Connect camera
Start camera acquisition
Stop camera acquisition
Start camera acquisition again
Stop camera acquisition
Disconnect camera
Draining queue
joining process
Camera process terminated
Queue drained
got 38 items!

If you don't want this behavior you can probably just terminate the child process, but the data that is still in the queue could be lost. which i don't think you care about anyway.

Upvotes: 0

Related Questions