Julian S.
Julian S.

Reputation: 450

How to properly combine PySide2 and pytransitions for implementing a state machine for GUI application

Background: I'd like to implement a GUI for controlling a bunch of clients (that talk to 'servers' controlling hardware like motors, cameras etc. via RPC calls) using PySide2.

Previous approach: Typically, what I'd do is to create my GUI and connect the UI signals to the Client slots and the other way round. This works perfectly fine for simpler applications.

Problem: I would like my GUI to represent allowed calls to the clients properly. The most simple example: after executing client1.doXY() I'd like to disable the button that executed that command and reactivate it only after doZY() is completed. While this is totally possible with the approach above it feels wrong when things get more complicated: e.g. when GUI elements depend on the state of multiple clients.

Approach : I therefore thought I'd be a good idea to use finite state machines as an intermediate layer between the clients and the GUI and came across pytransitions, which looks very promising. However, I'm struggling finding the right way combining those two worlds.

Questions:

Working example:

running state machine

Code:

import io
import logging
from time import sleep

import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine

logging.basicConfig(level=logging.DEBUG)


class Client(QObject):
    # Client signals
    sig_move_done = Signal()
    sig_disconnected = Signal()
    sig_connected = Signal()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    @Slot(int)
    def client_move(self, dest):
        print(f'Client moving to {dest}...')
        sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']

transitions = [
    {'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
    {'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
    {'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
    {'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}

]

extra_args = dict(initial='unknown', title='Simple state machine',
                  show_conditions=True, show_state_attributes=True)


class ClientState(QObject):
    # machine signals
    sig_update_available = Signal()
    sig_move_requested = Signal(int)  # can this be avoided ? see self.on_enter_moving
    sig_connect_requested = Signal()  # can this be avoided ? 

    def __init__(self, client, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = client
        # move client to seperate thread
        self.worker_thread = QThread()
        self.client.moveToThread(self.worker_thread)
        self.worker_thread.start()

        self.machine = GraphMachine(model=self, states=states, transitions=transitions,
                                    show_auto_transitions=False, **extra_args, after_state_change="update_available",
                                    send_event=True)

        # connecting Client signals to state machine triggers
        self.client.sig_disconnected.connect(self.disconnect_)
        self.client.sig_connected.connect(self.connect_)
        self.client.sig_move_done.connect(self.stopped)
        self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()

        # can this be avoided ? see self.on_enter_moving
        self.sig_move_requested.connect(self.client.client_move)
        self.sig_connect_requested.connect(self.client.client_connect)

    def on_enter_moving(self, event):
        print(event.kwargs)
        dest = event.kwargs.get('dest', 0)
        # calling self.client_move() directly will cause self.client_move to be called from main thread...
        # calling it via a helper signal instead:
        self.sig_move_requested.emit(dest)

    def show_graph(self, **kwargs):
        stream = io.BytesIO()
        self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
        return stream.getvalue()


class GUI(QWidget):
    def __init__(self, client_state):
        super().__init__()
        self.client_state = client_state

        # setup UI
        self.setWindowTitle("State")
        self.svgWidget = QtSvg.QSvgWidget()
        self.layout = QtWidgets.QVBoxLayout()
        self.layout.addWidget(self.svgWidget)
        self.btn_move = QPushButton("move")
        self.btn_connect = QPushButton("(re-)connect")
        self.layout.addWidget(self.btn_move)
        self.layout.addWidget(self.btn_connect)

        self.setLayout(self.layout)

        # Connect Slots/Signals
        ## machine -> GUI
        self.client_state.sig_update_available.connect(self.update_gui)

        ## GUI --> machine
        self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
        self.btn_connect.clicked.connect(
            self.client_state.connect_)

        # update UI
        self.update_gui()

    def update_gui(self):
        print("Update model graph and GUI...")
        self.svgWidget.load(self.client_state.show_graph())

        if self.client_state.is_ready():
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_moving():
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
        if self.client_state.is_unknown():
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)


if __name__ == "__main__":
    import sys

    app = QApplication(sys.argv)
    client = Client()
    client_state = ClientState(client)
    gui = GUI(client_state)
    gui.show()
    sys.exit(app.exec_())

Upvotes: 1

Views: 1174

Answers (1)

eyllanesc
eyllanesc

Reputation: 243947

Is this generally speaking a valid design approach to have such a layer?

Yes, it is valid and in complex applications the FSM is implemented as they simplify the logic.


With regard to IMHO simplification, I prefer to verify if there are similar tools in Qt that exist in this case since they interact friendly with the elements of Qt through events or signals. In this case there are at least 2 options:

The State Machine Framework:

import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets
import numpy as np


class Client(QtCore.QObject):
    # Client signals
    sig_move_done = QtCore.Signal()
    sig_disconnected = QtCore.Signal()
    sig_connected = QtCore.Signal()

    @QtCore.Slot(int)
    def client_move(self, dest):
        print(f"Client moving to {dest}...")
        time.sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @QtCore.Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @QtCore.Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


class GUI(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("State")

        self.btn_move = QtWidgets.QPushButton("move")
        self.btn_connect = QtWidgets.QPushButton("(re-)connect")

        self.client = Client()
        self._thread = QtCore.QThread(self)
        self._thread.start()
        self.client.moveToThread(self._thread)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(self.btn_move)
        lay.addWidget(self.btn_connect)
        self.resize(320, 120)

        # states
        self.unknown_state = QtCore.QState()
        self.ready_state = QtCore.QState()
        self.moving_state = QtCore.QState()

        # transitions
        self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
        self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
        self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
        self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
        self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
        self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)

        self.unknown_state.entered.connect(self.on_unknown_state_enter)
        self.ready_state.entered.connect(self.on_ready_state_enter)
        self.moving_state.entered.connect(self.on_moving_state_enter)

        state_machine = QtCore.QStateMachine(self)
        state_machine.addState(self.ready_state)
        state_machine.addState(self.moving_state)
        state_machine.addState(self.unknown_state)

        state_machine.setInitialState(self.unknown_state)
        state_machine.start()

    def on_unknown_state_enter(self):
        print("unknown_state")
        self.btn_move.setDisabled(True)
        self.btn_connect.setEnabled(True)

    def on_ready_state_enter(self):
        print("ready_state")
        self.btn_move.setEnabled(True)
        self.btn_connect.setDisabled(True)

    def on_moving_state_enter(self):
        print("moving_state")
        self.btn_move.setDisabled(True)
        self.btn_connect.setDisabled(True)
        dest = np.random.randint(1, 100)
        wrapper = partial(self.client.client_move, dest)
        QtCore.QTimer.singleShot(0, wrapper)

    def closeEvent(self, event):
        self._thread.quit()
        self._thread.wait()
        super().closeEvent(event)


if __name__ == "__main__":
    import sys

    app = QtWidgets.QApplication(sys.argv)

    w = GUI()
    w.show()

    sys.exit(app.exec_())

Qt SCXML:

Simple_State_Machine.scxml

<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
    <qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
    <state id="ready">
        <qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
        <transition type="internal" event="move" target="moving">
            <qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
        </transition>
        <transition type="internal" event="disconnect" target="unknown">
            <qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
        </transition>
    </state>
    <state id="unknown">
        <qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
        <transition type="internal" target="ready" event="connect">
            <qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
        </transition>
    </state>
    <state id="moving">
        <qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
        <transition type="internal" event="disconnect" target="unknown">
            <qt:editorinfo movePoint="2.08;17.72"/>
        </transition>
        <transition type="internal" event="stopped" target="ready">
            <qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
        </transition>
    </state>
</scxml>

enter image description here

import os
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
import numpy as np


class Client(QtCore.QObject):
    # Client signals
    sig_move_done = QtCore.Signal()
    sig_disconnected = QtCore.Signal()
    sig_connected = QtCore.Signal()

    @QtCore.Slot(int)
    def client_move(self, dest):
        print(f"Client moving to {dest}...")
        time.sleep(3)  # some blocking function
        if np.random.rand() < 0.5:
            print("Error occurred during movement...")
            self.sig_disconnected.emit()
        else:
            print("Movement done...")
            self.sig_move_done.emit()

    @QtCore.Slot()
    def client_disconnect(self):
        # do something then...  on success do:
        self.sig_disconnected.emit()

    @QtCore.Slot()
    def client_connect(self):
        # do something ... on success do:
        self.sig_connected.emit()


class GUI(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.setWindowTitle("State")

        self.btn_move = QtWidgets.QPushButton("move")
        self.btn_connect = QtWidgets.QPushButton("(re-)connect")

        self.client = Client()
        self._thread = QtCore.QThread(self)
        self._thread.start()
        self.client.moveToThread(self._thread)

        lay = QtWidgets.QVBoxLayout(self)
        lay.addWidget(self.btn_move)
        lay.addWidget(self.btn_connect)
        self.resize(320, 120)

        current_dir = os.path.dirname(os.path.realpath(__file__))
        filename = os.path.join(current_dir, "Simple_State_Machine.scxml")

        machine = QtScxml.QScxmlStateMachine.fromFile(filename)
        machine.setParent(self)

        for error in machine.parseErrors():
            print(error.toString())

        machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
        machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
        machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))


        self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
        self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))

        self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
        self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
        self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))

        machine.start()

    @QtCore.Slot(bool)
    def on_unknown_state_enter(self, active):
        if active:
            print("unknown_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setEnabled(True)

    @QtCore.Slot(bool)
    def on_ready_state_enter(self, active):
        if active:
            print("ready_state")
            self.btn_move.setEnabled(True)
            self.btn_connect.setDisabled(True)

    @QtCore.Slot(bool)
    def on_moving_state_enter(self, active):
        if active:
            print("moving_state")
            self.btn_move.setDisabled(True)
            self.btn_connect.setDisabled(True)
            dest = np.random.randint(1, 100)
            wrapper = partial(self.client.client_move, dest)
            QtCore.QTimer.singleShot(0, wrapper)

    def closeEvent(self, event):
        self._thread.quit()
        self._thread.wait()
        super().closeEvent(event)


if __name__ == "__main__":
    import sys

    app = QtWidgets.QApplication(sys.argv)

    w = GUI()
    w.show()

    sys.exit(app.exec_())

Upvotes: 2

Related Questions