jack87
jack87

Reputation: 465

python design pattern queue with workers

I'm currently working on a project that involves three components,

an observer that check for changes in a directory, a worker and an command line interface.

What I want to achieve is:

I don't know how to achieve this with python in terms of which component to use and how to link the three components.

I though as a singleton worker where the observer add a job to a queue but 1) I was not able to write a working code and 2) How can I fit the checker in?

Another solution that I thought of may be multiple child processes from a father that has the queue but I'm a bit lost...

Thanks for any advices

Upvotes: 0

Views: 2486

Answers (1)

a_guest
a_guest

Reputation: 36349

I'd use some kind of observer pattern or publish-subscribe pattern. For the former you can use for example the Python version of ReactiveX. But for a more basic example let's stay with the Python core. Parts of your program can subscribe to the worker and receive updates from the process via queues for example.

import itertools as it
from queue import Queue
from threading import Thread
import time


class Observable(Thread):
    def __init__(self):
        super().__init__()
        self._observers = []

    def notify(self, msg):
        for obs in self._observers:
            obs.put(msg)

    def subscribe(self, obs):
        self._observers.append(obs)


class Observer(Thread):
    def __init__(self):
        super().__init__()
        self.updates = Queue()


class Watcher(Observable):
    def run(self):
        for i in it.count():
            self.notify(i)
            time.sleep(1)


class Worker(Observable, Observer):
    def run(self):
        while True:
            task = self.updates.get()
            self.notify((str(task), 'start'))
            time.sleep(1)
            self.notify((str(task), 'stop'))


class Supervisor(Observer):
    def __init__(self):
        super().__init__()
        self._statuses = {}

    def run(self):
        while True:
            status = self.updates.get()
            print(status)
            self._statuses[status[0]] = status[1]
            # Do something based on status updates.
            if status[1] == 'stop':
                del self._statuses[status[0]]


watcher = Watcher()
worker = Worker()
supervisor = Supervisor()

watcher.subscribe(worker.updates)
worker.subscribe(supervisor.updates)

supervisor.start()
worker.start()
watcher.start()

However many variations are possible and you can check the various patterns which suits you most.

Upvotes: 2

Related Questions