Reputation: 4070
I have one producer and multiple consumers. When I use too much consumers, the Q becomes empty very fast and then all my consumers stop working while the producer is still producing items...
My code
Producer.py
class Producer():
def __init__(self,q):
self.q = q
print("Producer")
def generateItem(self):
for i in range(1,100):
print("generated item ",i,"\n")
self.q.put(i)
Consumer.py :
class Consumer():
def __init__(self,q):
print("Consumer")
self.q = q
def consumeItem(self):
while True:
if(not self.q.empty) :
print("consumed item : " ,self.q.get(),"\n")
else :
print("consumer is done")
MainService:
import queue
import threading
q=queue.Queue(maxsize=0)
from Producer import Producer
from Consumer import Consumer
producer=Producer(q)
consumer=Consumer(q)
threads=[]
threads.append(threading.Thread(target=producer.generateItem,name="thread1"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread2"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread3"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread4"))
for thread in threads:
thread.start()
log :
generated item 1
generated item 2
generated item 3
consumed item :1
consumed item :2
consumed item :3
consumer is done
consumer is done
consumer is done
generated item 4
generated item 5
....
Upvotes: 1
Views: 507
Reputation: 92854
You may apply Thread Event object as communication mechanism.
This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.
Sample working schema (producing 10 items):
import queue
import threading
import sys
import time
q = queue.Queue(maxsize=0)
class Producer():
def __init__(self, q):
self.q = q
print("Producer")
sys.stdout.flush()
def generate_item(self, event):
t_name = threading.current_thread().name
for i in range(1, 10):
print(f"{t_name} generated item {i}")
sys.stdout.flush()
self.q.put(i)
time.sleep(0.5)
print(f'*** producer {t_name} finished work')
sys.stdout.flush()
event.set()
class Consumer():
def __init__(self, q):
print("Consumer")
sys.stdout.flush()
self.q = q
def consume_item(self, event):
t_name = threading.current_thread().name
while True:
if not self.q.empty():
try:
val = self.q.get(timeout=0.5)
except queue.Empty:
if event.is_set():
break
else:
print(f"{t_name} consumed item : {val}")
sys.stdout.flush()
elif event.is_set():
break
print(f'*** consumer {t_name} finished work')
sys.stdout.flush()
producer = Producer(q)
consumer = Consumer(q)
e = threading.Event()
threads = []
threads.append(threading.Thread(target=producer.generate_item, args=(e,), name="thread1"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread2"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread3"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread4"))
for thread in threads:
thread.start()
#
# for thread in threads:
# thread.join()
The output:
Producer
Consumer
thread1 generated item 1
thread2 consumed item : 1
thread1 generated item 2
thread3 consumed item : 2
thread1 generated item 3
thread4 consumed item : 3
thread1 generated item 4
thread4 consumed item : 4
thread1 generated item 5
thread2 consumed item : 5
thread1 generated item 6
thread2 consumed item : 6
thread1 generated item 7
thread4 consumed item : 7
thread1 generated item 8
thread4 consumed item : 8
thread1 generated item 9
thread2 consumed item : 9
*** producer thread1 finished work
*** consumer thread3 finished work
*** consumer thread2 finished work
*** consumer thread4 finished work
Upvotes: 1