Sebastian
Sebastian

Reputation: 5873

Should Observers be notified in separate threads each one?

I know it sounds heavy weight, but I'm trying to solve an hypothetical situation. Imagine you have N observers of some object. Each one interested in the object state. When applying the Observer Pattern the observable object tends to iterate through its observer list invoking the observer notify()|update() method.

Now imagine that a specific observer has a lot of work to do with the state of the observable object. That will slow down the last notification, for example.

So, in order to avoid slowing down notifications to all observers, one thing we can do is to notify the observer in a separate thread. In order for that to work, I suppose that a thread for each observer is needed. That is a painful overhead we are having in order to avoid the notification slow down caused by heavy work. Worst than slowing down if thread approach is used, is dead threads caused by infinite loops. It would be great reading experienced programmers for this one.

Example

This is a vague example in order to demonstrate and, hopefully, clarify the basic idea that I don't even tested:

class Observable(object):
    def __init__(self):
        self.queues = {}

    def addObserver(self, observer):
        if not observer in self.queues:
            self.queues[observer] = Queue()
            ot = ObserverThread(observer, self.queues[observer])
            ot.start()

    def removeObserver(self, observer):
        if observer in self.queues:
            self.queues[observer].put('die')
            del self.queues[observer]

    def notifyObservers(self, state):
        for queue in self.queues.values():
            queue.put(state)

class ObserverThread(Thread):
    def __init__(self, observer, queue):
        self.observer = observer
        self.queue = queue

    def run(self):
        running = True
        while running:
            state = self.queue.get()
            if state == 'die':
                running = False
            else:
                self.observer.stateChanged(state)

Upvotes: 11

Views: 3752

Answers (4)

Haotian Yang
Haotian Yang

Reputation: 126

The answer of @Pathai is valid in a lot of cases.

One is that you are observing changes in a database. In many ways you can't reconstruct the final state from the snapshots alone, especially if your state is fetched as a complex query from the database, and the snapshot is an update to the database.

To implement it, I'd suggest using an Event object:

class Observer:
    def __init__(self):
        self.event = threading.Event()

# in observer:
while self.event.wait():
    # do something
    self.event.clear()

# in observable:
observer.event.set()

Upvotes: 0

Alexei Kaigorodov
Alexei Kaigorodov

Reputation: 13535

Let each observer decide itself if its reaction is heavyweight, and if so, start a thread, or submit a task to a thread pool. Making notification in a separate thread is not a good solution: while freeing the observable object, it limits the processor power for notifications with single thread. If you do not trust your observers, then create a thread pool and for each notification, create a task and submit it to the pool.

Upvotes: 3

Lior Kogan
Lior Kogan

Reputation: 20658

You're on the right track.

It is common for each observer to own its own input-queue and its own message handling thread (or better: the queue would own the thread, and the observer would own the queue). See Active object pattern.

There are some pitfalls however:

  • If you have 100's or 1000's of observers you may need to use a thread pool pattern
  • Note the you'll lose control over the order in which events are going to be processed (which observer handles the event first). This may be a non-issue, or may open a Pandora box of very-hard-to-detect bugs. It depends on your specific application.
  • You may have to deal with situations where observers are deleted before notifiers. This can be somewhat tricky to handle correctly.
  • You'll need to implement messages instead of calling functions. Message generation may require more resources, as you may need to allocate memory, copy objects, etc. You may even want to optimize by implementing a message pool for common message types (you may as well choose to implement a message factory that wrap such pools).
  • To further optimize, you'll probably like to generate one message and send it to all to observers (instead of generating many copies of the same message). You may need to use some reference counting mechanism for your messages.

Upvotes: 8

Narendra Pathai
Narendra Pathai

Reputation: 41985

In my opinion when you have a large no of Observers for an Observable, which do heavy processing, then the best thing to do is to have a notify() method in Observer.

Use of notify(): Just to set the dirty flag in the Observer to true. So whenever the Observer thread will find it appropriate it will query the Observable for the required updates.

And this would not require heavy processing on Observable side and shift the load to the Observer side.

Now it depends on the Observers when they have to Observe.

Upvotes: 0

Related Questions