Reputation: 13
I have been trying to implement a multiple producer and multiple consumer model using multiprocessing in python. Producers Scrape data from the web and Consumers process the data. At first I just implemented two function producers and consumer with particular functionality and used Queue to communicate between them but couldn't figure out how to handle the completion event. Then I implemented the model using semaphore -
def producer(RESP_q, URL_q, SEM):
with SEM:
while True:
url = URL_q.get()
if url == "END":
break
RESP = produce_txns(url)
RESP_q.put(RESP)
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
resp = RESP_q.get()
for txn in resp:
_txn = E_Transaction(txn)
print(_txn)
RESP_q.task_done()
class Manager:
def __init__(self):
self.URL_q = Queue()
self.RESP_q = JoinableQueue()
self.max_processes = cpu_count()
self.SEM = Semaphore(self.max_processes // 2)
def start(self):
self.worker = []
for i in range(0, self.max_processes, 2):
self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
url_server(self.URL_q, self.max_processes // 2)
#Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]
for worker in self.worker:
worker.start()
self.stop()
def stop(self):
for worker in self.worker:
worker.join()
self.RESP_q.join()
self.RESP_q.close()
self.URL_q.close()
Manager().start()
This implementation fails when (In Consumer) RESP_q is empty and SEM is close to max_process and when the interpreter satisfies the while condition, SEM will have the same value as max_process and no producers will be left and program gets blocked at get method. I am not able solve this problem.
Edit 1.
@Louis Lac's Implementation is also correct. I corrected my code to remove the deadlock using try-except block.
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
try:
resp = RESP_q.get(timeout=0.5)
except Exception:
continue
Upvotes: 1
Views: 847
Reputation: 6396
Here is an example of multiple consumers multiple producers implementation. You can use the daemon
flag when instantiating the processes so that they are automatically closed when the program quits. You can then use the JoinableQueue
and join them (instead of joining the processes) so that the programs quits when there is no item left to process.
You should use if __main__ == "__main__
to launch the program without causing a recursive execution of that program.
from multiprocessing import Process, JoinableQueue
from time import sleep
def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
s = str(item)
out_queue.put(s)
in_queue.task_done()
def producer(in_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
n = int(item)
print(n)
in_queue.task_done()
if __name__ == "__main__":
number_queue = JoinableQueue()
str_queue = JoinableQueue()
for _ in range(4):
Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
Process(target=producer, args=(str_queue,), daemon=True).start()
for i in range(100):
number_queue.put(i)
number_queue.join()
str_queue.join()
Upvotes: 1