JDD
JDD

Reputation: 335

Single Producer Multiple Consumer

I wish to have a single producer, multiple consumer architecture in Python while performing multi-threaded programming. I wish to have an operation like this :

  1. Producer produces the data
  2. Consumers 1 ..N (N is pre-determined) wait for the data to arrive (block) and then process the SAME data in different ways.

So I need all the consumers to to get the same data from the producer.

When I used Queue to perform this, I realized that all but the first consumer would be starved with the implementation I have.

One possible solution is to have a unique queue for each of the consumer threads wherein the same data is pushed in multiple queues by the producer. Is there a better way to do this ?

from threading import Thread
import time
import random
from Queue import Queue

my_queue = Queue(0)

def Producer():
    global my_queue
    my_list = []
    for each in range (50):
        my_list.append(each)
    my_queue.put(my_list)

def Consumer1():
    print "Consumer1"
    global my_queue
    print my_queue.get()
    my_queue.task_done()

def Consumer2():
    print "Consumer2"
    global my_queue
    print my_queue.get()
    my_queue.task_done()


P = Thread(name = "Producer", target = Producer)

C1 = Thread(name = "Consumer1", target = Consumer1)

C2 = Thread(name = "Consumer2", target = Consumer2)


P.start()

C1.start()

C2.start()

In the example above, the C2 gets blocked indefinitely as C1 consumes the data produced by P1. What I would rather want is for C1 and C2 both to be able to access the SAME data as produced by P1.

Thanks for any code/pointers!

Upvotes: 16

Views: 13079

Answers (4)

Buzz
Buzz

Reputation: 1412

I do know it might be an overkill, but... What about using signal/slot framework from Qt? For consistency, QThread could be used instead of threading.Thread

from __future__ import annotations # Needed for forward Consumer typehint in register_consumer

from queue import Queue
from typing import List

from PySide2.QtCore import QThread, QObject, QCoreApplication, Signal, Slot, Qt
import time
import random

def thread_name():
    # Convenient class
    return QThread.currentThread().objectName()

class Producer(QThread):
    product_available = Signal(list)

    def __init__(self):
        QThread.__init__(self, objectName='ThreadProducer')
        self.consumers: List[Consumer] = list()
        # See Consumer class comments for info (exactly the same reason here)
        self.internal_consumer_queue = Queue()
        self.active = True

    def run(self):
        my_list = [each for each in range(5)]
        self.product_available.emit(my_list)
        print(f'Producer: from thread {QThread.currentThread().objectName()} I\'ve sent my products\n')
        while self.active:
            consumer: Consumer = self.internal_consumer_queue.get(block=True)
            print(f'Producer: {consumer} has told me it has completed his task with my product! '
                  f'(Thread {thread_name()})')
            if not consumer in self.consumers:
                raise ValueError(f'Consumer {consumer} was not registered')
            self.consumers.remove(consumer)
            if len(self.consumers) == 0:
                print('All consumers have completed their task! I\'m terminating myself')
                self.active = False

    @Slot(object)
    def on_task_done_by_consumer(self, consumer: Consumer):
        self.internal_consumer_queue.put(consumer)


    def register_consumer(self, consumer: Consumer):
        if consumer in self.consumers:
            return
        self.consumers.append(consumer)
        consumer.task_done_with_product.connect(self.on_task_done_by_consumer)


class Consumer(QThread):
    task_done_with_product = Signal(object)
    def __init__(self, name: str, producer: Producer):
        self.name = name
        # Super init and set Thread name
        QThread.__init__(self, objectName=f'Thread_Of_{self.name}')
        self.producer = producer
        # See method on_product_available doc
        self.internal_queue = Queue()

    def run(self) -> None:
        self.producer.product_available.connect(self.on_product_available, Qt.ConnectionType.UniqueConnection)
        # Thread loop waiting for product availability
        product = self.internal_queue.get(block=True)
        print(f'{self.name}: Product {product} received and elaborated in thread {thread_name()}\n\n')

        # Tell the producer I've done
        self.task_done_with_product.emit(self)

        # Now the thread is naturally closed

    @Slot(list)
    def on_product_available(self, product: list):
        """
        As a limitation of PySide, it seems that list are not supported for QueuedConnection. This work around using
        internal queue might solve
        """
        # This is executed in Main Loop!
        print(f'{self.name}: In thread {thread_name()} I received the product, and I\'m queuing it for being elaborated'
              f'in consumer thread')
        self.internal_queue.put(product)
        # Quit the thread
        self.active = False

    def __repr__(self):
        # Needed in case of exception for representing current consumer
        return f'{self.name}'


# Needed to executed main and threads event loops
app = QCoreApplication()

QThread.currentThread().setObjectName('MainThread')

producer = Producer()

c1 = Consumer('Consumer1', producer)
c1.start()
producer.register_consumer(c1)

c2 = Consumer('Consumer2', producer)
c2.start()
producer.register_consumer(c2)

producer.product_available.connect(c1.on_product_available)
producer.product_available.connect(c2.on_product_available)

# Start Producer thread for LAST!
producer.start()

app.exec_()

Results:

Producer: from thread ThreadProducer I've sent my products
Consumer1: In thread MainThread I received the product, and I'm queuing it for being elaboratedin consumer thread
Consumer1: Product [0, 1, 2, 3, 4] received and elaborated in thread Thread_Of_Consumer1
Consumer2: In thread MainThread I received the product, and I'm queuing it for being elaboratedin consumer thread
Consumer2: Product [0, 1, 2, 3, 4] received and elaborated in thread Thread_Of_Consumer2
Producer: Consumer1 has told me it has completed his task with my product! (Thread ThreadProducer)
Producer: Consumer2 has told me it has completed his task with my product! (Thread ThreadProducer)
All consumers have completed their task! I'm terminating myself

Notes:

  • The step-by-step explanation is into the code comments. If anything is unclear, I'll try my best for better clarifying
  • Unfortunately I've not found a way to use QueueConnection (doc here) so as to directly execute the Slot into the proper thread: an internal queueing has been used to pass information from main loop to proper thread (either Producer and Consumer). It seems that list and object cannot be meta-registered in PySide/pyqt for queueing purposes

Upvotes: 0

Jiusheng Chen
Jiusheng Chen

Reputation: 51

A single-producers and five-consumers example, verified.

from multiprocessing import Process, JoinableQueue
import time
import os

q = JoinableQueue()

def producer():
    for item in range(30):
        time.sleep(2)
        q.put(item)
    pid = os.getpid()
    print(f'producer {pid} done')


def worker():
    while True:
        item = q.get()
        pid = os.getpid()
        print(f'pid {pid} Working on {item}')
        print(f'pid {pid} Finished {item}')
        q.task_done()

for i in range(5):
    p = Process(target=worker, daemon=True).start()

producers = []
# it is easy to extend it to multi producers.
for i in range(1):
    p = Process(target=producer)
    producers.append(p)
    p.start()

# make sure producers done
for p in producers:
    p.join()

# block until all workers are done
q.join()
print('All work completed')

Explanation:

  1. One producer and five consumers in this example.
  2. JoinableQueue is used to make sure all elements stored in queue will be processed. 'task_done' is for worker to notify an element is done. 'q.join()' will wait for all elements marked as done.
  3. With #2, there is no need to join wait for every worker.
  4. But it is important to join wait for producer to store element into queue. Otherwise, program exit immediately.

Upvotes: 1

Dmitry Nedbaylo
Dmitry Nedbaylo

Reputation: 2334

Your producer creates only one job to do:

my_queue.put(my_list)

For example, put my_list twice, and both consumers work:

def Producer():
    global my_queue
    my_list = []
    for each in range (50):
        my_list.append(each)
    my_queue.put(my_list)
    my_queue.put(my_list)

So this way you put two jobs to queue with the same list.

However i have to warn you: to modify the same data in different threads without thread synchronization is generally bad idea.

Anyways, approach with one queue would not work for you, since one queue is supposed to be processed with threads with the same algorithm.

So, I advise you to go ahead with unique queue per each consumer, since other solutions are not as trivial.

Upvotes: 3

Jonathon Reinhart
Jonathon Reinhart

Reputation: 137398

How about a per-thread queue then?

As part of starting each consumer, you would also create another Queue, and add this to a list of "all thread queues". Then start the producer, passing it the list of all queues, which he can then push data into all of them.

Upvotes: 1

Related Questions