JNevens
JNevens

Reputation: 11982

Monitoring system with events in Python

I am creating a project in Python and I would like to add a monitoring system that makes use of events and event handlers. I would like this system to be available throughout the project. I have the following operations in mind:

How can I create such a system? Are there any libraries that can help me with this? I am especially wondering how I can make this system in such a way that it is transparently available throughout the project.

Upvotes: 9

Views: 4376

Answers (4)

Erik Aronesty
Erik Aronesty

Reputation: 12897

I use this one for health monitoring, which allows the user to specify callbacks, and allows for both threaded, active monitors, and passive monitors:

https://gist.github.com/earonesty/4ccf8fc9bde6feac30e5c155e54dfa5f

I pasted the code below, without the tests (more than the code):

class MonitorInstance:
    def __init__(self, parent, label, func, threshold, active, metric):
        self.parent = parent
        self.label = label
        self.func = func
        self.threshold = threshold
        self.active = active
        self.metric = metric
        self.__errors = None

    def ok(self):
        if self.__errors is None or self.__errors:
            self.parent._ok(self)
        self.__errors = 0
        if self.metric:
            self.metric.set(0)

    def error(self):
        if not self.__errors:
            self.parent._error(self)

        if self.__errors is None:
            self.__errors = 0

        self.__errors += 1

        if self.metric:
            self.metric.inc()

    def check(self):
        try:
            self.func()
            self.ok()
        except Exception as e:
            log.error("%s error: %s", self.label, e)
            self.error()

    @property
    def healthy(self):
        return self.__errors < self.threshold

DEFAULT_THRESHOLD = 1           # errors to cause fault
DEFAULT_CHECKSECS = 5           # time in secs between checks

class Monitor:
    def __init__(self, health_callback=None, check_secs=DEFAULT_CHECKSECS, use_thread=False):
        self.active = []        # active moniors
        self.alerts = set()     # thresholds currently triggered (not healthy)
        self.health_callback = health_callback
        self.healthy = False    # default: not healthy unless a monitor is added!
        self.check_secs = check_secs
        self.last_check = 0

        if use_thread:
            assert self.check_secs > 0, "threads need to sleep"
            threading.Thread(target=self._thread_loop, daemon=True).start()

    def add(self, label, check, threshold=DEFAULT_THRESHOLD, active=False, metric=None):
        inst = MonitorInstance(self, label, check, threshold, active, metric)
        if active:
            self.active.append(inst)
        inst.check()
        return inst

    def _error(self, inst):
        self.alerts.add(inst)
        if self.healthy:
            self._callback(False)
        self.healthy = False

    def _thread_loop(self):
        while True:
            self.check()
            time.sleep(self.check_secs)

    def _callback(self, value):
        if not self.health_callback is None:
            try:
                self.health_callback(value)
            except:
                # health callback should always succeed!
                log.exception("deadlyexes: error calling %s", self.health_callback)

    def _ok(self, inst):
        self.alerts.discard(inst)
        if not self.healthy and not self.alerts:
            self._callback(True)
            self.healthy = True

    def check(self, force=False):
        if not force and (time.time() < (self.last_check + self.check_secs)):
            return False

        # returns true if check was done
        checked=False
        # convert to list prevents modifying iterators
        for inst in list(self.alerts) + self.active:
            try:
                checked=True
                inst.check()
            except:
                pass
        return checked

Upvotes: 0

mastro35
mastro35

Reputation: 143

You can create your own system using a distribuited messaging system like zmq and the "publisher subscriber" pattern.

I've made something like that building a customizable workflow engine (Flows, https://github.com/mastro35/flows )

Bye D.

Upvotes: 0

Paul Cornelius
Paul Cornelius

Reputation: 10946

You can do most of what you want with forty lines of Python code. This is my own design that I use all the time. The function names are chosen to make it a drop-in replacement for Qt's "signals" and "slots".

It's simple to use. You create a PSignal. You register handlers by calling the connect method. A handler can be any callable. When an event occurs you emit a signal (i.e., notify an event) by calling the emit function. Every registered callable runs at that point. The object calling emit doesn't know, or care, whether anyone is listening or what happens if they are.

You can also disconnect a handler.

There is a lot of debugging code because I discovered that otherwise certain errors can be difficult to track down.

In your question you wanted each handler to be a monitor, and in my design handlers are just functions. But it seems to me that your "monitor" concept is independent of the event/handler mechanism. You're going to have to write functions to make your application go, and it should be pretty easy to make those functions call your monitors.

The code is extensively tested with Python 3.3.

#! python3
import traceback

class PSignal:
    def __init__(self, debug=False):
        self.debug = debug
        self.__handlers = []

    def clear(self):
        """Deletes all the handlers."""
        self.__handlers.clear()

    def connect(self, f):
        """f is a python function."""
        if not callable(f):
            raise ValueError("Object {!r} is not callable".format(f))
        self.__handlers.append(f)
        if self.debug:
            print("PSIGNAL: Connecting", f, self.__handlers)

    def disconnect(self, f):
        for f1 in self.__handlers:
            if f == f1:
                self.__handlers.remove(f)
                return

    def emit(self, *x, **y):
        self._emit(*x, **y)

    def check_debug(self):
        if self.debug and self.__handlers:
            print("PSIGNAL: Signal emitted")
            traceback.print_stack()

    def _emit(self, *x, **y):
        self.check_debug()
        for f in self.__handlers:
            try:
                if self.debug:
                    print("PSIGNAL: emit", f, len(x), x, y)
                f(*x, **y)
            except Exception:
                print("PSIGNAL: Error in signal", f)
                traceback.print_exc()

Upvotes: 4

Maxim Krabov
Maxim Krabov

Reputation: 715

Check out Reactive Python (RxPy)

ReactiveX

GitHub/python

Upvotes: 2

Related Questions