Rinkeby
Rinkeby

Reputation: 13

Implementing multiple producer and multiple workers results in deadlock

I have been trying to implement a multiple producer and multiple consumer model using multiprocessing in python. Producers Scrape data from the web and Consumers process the data. At first I just implemented two function producers and consumer with particular functionality and used Queue to communicate between them but couldn't figure out how to handle the completion event. Then I implemented the model using semaphore -

def producer(RESP_q, URL_q, SEM):
    with SEM:
        while True:
            url = URL_q.get()

            if url == "END": 
                break

            RESP = produce_txns(url)
            RESP_q.put(RESP)

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        resp = RESP_q.get()

        for txn in resp:
            _txn = E_Transaction(txn)
            print(_txn)
 
        RESP_q.task_done()


class Manager:
    def __init__(self):
        self.URL_q = Queue()
        self.RESP_q = JoinableQueue()
        self.max_processes = cpu_count()
        self.SEM = Semaphore(self.max_processes // 2)

    def start(self):
        self.worker = []

        for i in range(0, self.max_processes, 2):
            self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
            self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
        
        url_server(self.URL_q, self.max_processes // 2)
        #Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]

        for worker in self.worker:
            worker.start()

        self.stop()

    def stop(self):
        for worker in self.worker:
            worker.join()

        self.RESP_q.join()
        self.RESP_q.close()
        self.URL_q.close()
        

Manager().start()

This implementation fails when (In Consumer) RESP_q is empty and SEM is close to max_process and when the interpreter satisfies the while condition, SEM will have the same value as max_process and no producers will be left and program gets blocked at get method. I am not able solve this problem.

Edit 1.

@Louis Lac's Implementation is also correct. I corrected my code to remove the deadlock using try-except block.

def consumer(RESP_q, SEM, NP):
    while SEM.get_value() < NP or not RESP_q.empty():
        try:
           resp = RESP_q.get(timeout=0.5)
        except Exception:
           continue

Upvotes: 1

Views: 847

Answers (1)

Louis Lac
Louis Lac

Reputation: 6396

Here is an example of multiple consumers multiple producers implementation. You can use the daemon flag when instantiating the processes so that they are automatically closed when the program quits. You can then use the JoinableQueue and join them (instead of joining the processes) so that the programs quits when there is no item left to process.

You should use if __main__ == "__main__ to launch the program without causing a recursive execution of that program.

from multiprocessing import Process, JoinableQueue
from time import sleep


def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        s = str(item)
        out_queue.put(s)
        in_queue.task_done()

def producer(in_queue: JoinableQueue):
    while True:
        item = in_queue.get()
        sleep(0.5)
        n = int(item)
        print(n)
        in_queue.task_done()

if __name__ == "__main__":
    number_queue = JoinableQueue()
    str_queue = JoinableQueue()

    for _ in range(4):
        Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
        Process(target=producer, args=(str_queue,), daemon=True).start()

    for i in range(100):
        number_queue.put(i)

    number_queue.join()
    str_queue.join()

Upvotes: 1

Related Questions