JeyJ
JeyJ

Reputation: 4070

producer-multi consumer problem, consumers finishes work before producer is done

I have one producer and multiple consumers. When I use too much consumers, the Q becomes empty very fast and then all my consumers stop working while the producer is still producing items...

My code

Producer.py

class Producer():
    def __init__(self,q):
        self.q = q
        print("Producer")

    def generateItem(self):
        for i in range(1,100):
            print("generated item ",i,"\n")
            self.q.put(i)

Consumer.py :

class Consumer():

    def __init__(self,q):
        print("Consumer")
        self.q = q


    def consumeItem(self):
        while  True:
            if(not self.q.empty) : 
                print("consumed item : " ,self.q.get(),"\n")
            else :
                print("consumer is done")

MainService:

import queue

import threading
q=queue.Queue(maxsize=0)

from Producer import Producer
from Consumer import Consumer

producer=Producer(q)
consumer=Consumer(q)

threads=[]
threads.append(threading.Thread(target=producer.generateItem,name="thread1"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread2"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread3"))
threads.append(threading.Thread(target=consumer.consumeItem,name="thread4"))

for thread in threads:
    thread.start()

log :

generated item 1
generated item 2
generated item 3
consumed item :1
consumed item :2
consumed item :3
consumer is done
consumer is done
consumer is done
generated item 4
generated item 5
....

Upvotes: 1

Views: 507

Answers (1)

RomanPerekhrest
RomanPerekhrest

Reputation: 92854

You may apply Thread Event object as communication mechanism.

This is one of the simplest mechanisms for communication between threads: one thread signals an event and other threads wait for it.

Sample working schema (producing 10 items):

import queue
import threading
import sys
import time

q = queue.Queue(maxsize=0)


class Producer():
    def __init__(self, q):
        self.q = q
        print("Producer")
        sys.stdout.flush()

    def generate_item(self, event):
        t_name = threading.current_thread().name
        for i in range(1, 10):
            print(f"{t_name} generated item {i}")
            sys.stdout.flush()
            self.q.put(i)
            time.sleep(0.5)
        print(f'*** producer {t_name} finished work')
        sys.stdout.flush()
        event.set()


class Consumer():
    def __init__(self, q):
        print("Consumer")
        sys.stdout.flush()
        self.q = q

    def consume_item(self, event):
        t_name = threading.current_thread().name
        while True:
            if not self.q.empty():
                try:
                    val = self.q.get(timeout=0.5)
                except queue.Empty:
                    if event.is_set():
                        break
                else:
                    print(f"{t_name} consumed item : {val}")
                    sys.stdout.flush()
            elif event.is_set():
                break
        print(f'*** consumer {t_name} finished work')
        sys.stdout.flush()


producer = Producer(q)
consumer = Consumer(q)
e = threading.Event()

threads = []
threads.append(threading.Thread(target=producer.generate_item, args=(e,), name="thread1"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread2"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread3"))
threads.append(threading.Thread(target=consumer.consume_item, args=(e,), name="thread4"))

for thread in threads:
    thread.start()
#
# for thread in threads:
#     thread.join()

The output:

Producer
Consumer
thread1 generated item 1
thread2 consumed item : 1
thread1 generated item 2
thread3 consumed item : 2
thread1 generated item 3
thread4 consumed item : 3
thread1 generated item 4
thread4 consumed item : 4
thread1 generated item 5
thread2 consumed item : 5
thread1 generated item 6
thread2 consumed item : 6
thread1 generated item 7
thread4 consumed item : 7
thread1 generated item 8
thread4 consumed item : 8
thread1 generated item 9
thread2 consumed item : 9
*** producer thread1 finished work
*** consumer thread3 finished work
*** consumer thread2 finished work
*** consumer thread4 finished work

Upvotes: 1

Related Questions