Reputation: 450
Background: I'd like to implement a GUI for controlling a bunch of clients (that talk to 'servers' controlling hardware like motors, cameras etc. via RPC calls) using PySide2.
Previous approach: Typically, what I'd do is to create my GUI and connect the UI signals to the Client slots and the other way round. This works perfectly fine for simpler applications.
Problem: I would like my GUI to represent allowed calls to the clients properly. The most simple example: after executing client1.doXY()
I'd like to disable the button that executed that command and reactivate it only after doZY()
is completed. While this is totally possible with the approach above it feels wrong when things get more complicated: e.g. when GUI elements depend on the state of multiple clients.
Approach : I therefore thought I'd be a good idea to use finite state machines as an intermediate layer between the clients and the GUI and came across pytransitions, which looks very promising. However, I'm struggling finding the right way combining those two worlds.
Questions:
Is this generally speaking a valid design approach to have such a layer ?
In particular as shown in the working code example, I have to move the client to a separate thread to avoid the GUI freeze while the client is performing a blocking call. While my code works fine, it requires some overhead in creating additional qt signals to connect the ClientState
and the Client
object.
Can this be done more elegantly (i.e. no additional xy_requested signal, but somehow a direct call from the ClientState
to the Client
functions that is still calls the Client
function in the Client
thread and not the main thread ?
Working example:
Code:
import io
import logging
from time import sleep
import numpy as np
from PySide2 import QtSvg, QtWidgets
from PySide2.QtCore import Signal, Slot, QObject, QThread
from PySide2.QtWidgets import QWidget, QPushButton, QApplication
from transitions.extensions import GraphMachine
logging.basicConfig(level=logging.DEBUG)
class Client(QObject):
# Client signals
sig_move_done = Signal()
sig_disconnected = Signal()
sig_connected = Signal()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@Slot(int)
def client_move(self, dest):
print(f'Client moving to {dest}...')
sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
# define states, transitions and extra args for transitions state machine:
states = ['ready', 'moving', 'unknown']
transitions = [
{'trigger': 'move', 'source': 'ready', 'dest': 'moving'},
{'trigger': 'stopped', 'source': 'moving', 'dest': 'ready'},
{'trigger': 'disconnect_', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'error', 'source': ['ready', 'moving'], 'dest': 'unknown'},
{'trigger': 'connect_', 'source': 'unknown', 'dest': 'ready'}
]
extra_args = dict(initial='unknown', title='Simple state machine',
show_conditions=True, show_state_attributes=True)
class ClientState(QObject):
# machine signals
sig_update_available = Signal()
sig_move_requested = Signal(int) # can this be avoided ? see self.on_enter_moving
sig_connect_requested = Signal() # can this be avoided ?
def __init__(self, client, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client = client
# move client to seperate thread
self.worker_thread = QThread()
self.client.moveToThread(self.worker_thread)
self.worker_thread.start()
self.machine = GraphMachine(model=self, states=states, transitions=transitions,
show_auto_transitions=False, **extra_args, after_state_change="update_available",
send_event=True)
# connecting Client signals to state machine triggers
self.client.sig_disconnected.connect(self.disconnect_)
self.client.sig_connected.connect(self.connect_)
self.client.sig_move_done.connect(self.stopped)
self.update_available = lambda *args, **kwargs: self.sig_update_available.emit()
# can this be avoided ? see self.on_enter_moving
self.sig_move_requested.connect(self.client.client_move)
self.sig_connect_requested.connect(self.client.client_connect)
def on_enter_moving(self, event):
print(event.kwargs)
dest = event.kwargs.get('dest', 0)
# calling self.client_move() directly will cause self.client_move to be called from main thread...
# calling it via a helper signal instead:
self.sig_move_requested.emit(dest)
def show_graph(self, **kwargs):
stream = io.BytesIO()
self.get_graph(**kwargs).draw(stream, prog='dot', format='svg')
return stream.getvalue()
class GUI(QWidget):
def __init__(self, client_state):
super().__init__()
self.client_state = client_state
# setup UI
self.setWindowTitle("State")
self.svgWidget = QtSvg.QSvgWidget()
self.layout = QtWidgets.QVBoxLayout()
self.layout.addWidget(self.svgWidget)
self.btn_move = QPushButton("move")
self.btn_connect = QPushButton("(re-)connect")
self.layout.addWidget(self.btn_move)
self.layout.addWidget(self.btn_connect)
self.setLayout(self.layout)
# Connect Slots/Signals
## machine -> GUI
self.client_state.sig_update_available.connect(self.update_gui)
## GUI --> machine
self.btn_move.clicked.connect(lambda: self.client_state.move(dest=np.random.randint(1, 100)))
self.btn_connect.clicked.connect(
self.client_state.connect_)
# update UI
self.update_gui()
def update_gui(self):
print("Update model graph and GUI...")
self.svgWidget.load(self.client_state.show_graph())
if self.client_state.is_ready():
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_moving():
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
if self.client_state.is_unknown():
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
if __name__ == "__main__":
import sys
app = QApplication(sys.argv)
client = Client()
client_state = ClientState(client)
gui = GUI(client_state)
gui.show()
sys.exit(app.exec_())
Upvotes: 1
Views: 1174
Reputation: 243947
Yes, it is valid and in complex applications the FSM is implemented as they simplify the logic.
With regard to IMHO simplification, I prefer to verify if there are similar tools in Qt that exist in this case since they interact friendly with the elements of Qt through events or signals. In this case there are at least 2 options:
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
# states
self.unknown_state = QtCore.QState()
self.ready_state = QtCore.QState()
self.moving_state = QtCore.QState()
# transitions
self.ready_state.addTransition(self.btn_move.clicked, self.moving_state)
self.moving_state.addTransition(self.client.sig_move_done, self.ready_state)
self.ready_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.moving_state.addTransition(self.client.sig_disconnected, self.unknown_state)
self.unknown_state.addTransition(self.btn_connect.clicked, self.ready_state)
self.unknown_state.addTransition(self.client.sig_connected, self.ready_state)
self.unknown_state.entered.connect(self.on_unknown_state_enter)
self.ready_state.entered.connect(self.on_ready_state_enter)
self.moving_state.entered.connect(self.on_moving_state_enter)
state_machine = QtCore.QStateMachine(self)
state_machine.addState(self.ready_state)
state_machine.addState(self.moving_state)
state_machine.addState(self.unknown_state)
state_machine.setInitialState(self.unknown_state)
state_machine.start()
def on_unknown_state_enter(self):
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
def on_ready_state_enter(self):
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
def on_moving_state_enter(self):
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())
Simple_State_Machine.scxml
<?xml version="1.0" encoding="UTF-8"?>
<scxml xmlns="http://www.w3.org/2005/07/scxml" version="1.0" binding="early" xmlns:qt="http://www.qt.io/2015/02/scxml-ext" name="Simple_State_Machine" qt:editorversion="4.10.0" initial="unknown">
<qt:editorinfo initialGeometry="150.82;359.88;-20;-20;40;40"/>
<state id="ready">
<qt:editorinfo stateColor="#ff974f" geometry="425.83;190.46;-60;-50;120;100" scenegeometry="425.83;190.46;365.83;140.46;120;100"/>
<transition type="internal" event="move" target="moving">
<qt:editorinfo endTargetFactors="35.02;9.52" movePoint="-34.84;14.59" startTargetFactors="32.33;90.16"/>
</transition>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo endTargetFactors="91.87;60.92" movePoint="9.38;9.36" startTargetFactors="6.25;63.37"/>
</transition>
</state>
<state id="unknown">
<qt:editorinfo stateColor="#89725b" geometry="150.82;190.46;-60;-50;120;100" scenegeometry="150.82;190.46;90.82;140.46;120;100"/>
<transition type="internal" target="ready" event="connect">
<qt:editorinfo endTargetFactors="6.34;41.14" movePoint="0;7.30" startTargetFactors="91.13;39.41"/>
</transition>
</state>
<state id="moving">
<qt:editorinfo stateColor="#a508d0" geometry="425.83;344.53;-60;-50;120;100" scenegeometry="425.83;344.53;365.83;294.53;120;100"/>
<transition type="internal" event="disconnect" target="unknown">
<qt:editorinfo movePoint="2.08;17.72"/>
</transition>
<transition type="internal" event="stopped" target="ready">
<qt:editorinfo endTargetFactors="68.30;90.08" movePoint="62.50;10.32" startTargetFactors="68.69;5.74"/>
</transition>
</state>
</scxml>
import os
import time
from functools import partial
from PySide2 import QtCore, QtGui, QtWidgets, QtScxml
import numpy as np
class Client(QtCore.QObject):
# Client signals
sig_move_done = QtCore.Signal()
sig_disconnected = QtCore.Signal()
sig_connected = QtCore.Signal()
@QtCore.Slot(int)
def client_move(self, dest):
print(f"Client moving to {dest}...")
time.sleep(3) # some blocking function
if np.random.rand() < 0.5:
print("Error occurred during movement...")
self.sig_disconnected.emit()
else:
print("Movement done...")
self.sig_move_done.emit()
@QtCore.Slot()
def client_disconnect(self):
# do something then... on success do:
self.sig_disconnected.emit()
@QtCore.Slot()
def client_connect(self):
# do something ... on success do:
self.sig_connected.emit()
class GUI(QtWidgets.QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("State")
self.btn_move = QtWidgets.QPushButton("move")
self.btn_connect = QtWidgets.QPushButton("(re-)connect")
self.client = Client()
self._thread = QtCore.QThread(self)
self._thread.start()
self.client.moveToThread(self._thread)
lay = QtWidgets.QVBoxLayout(self)
lay.addWidget(self.btn_move)
lay.addWidget(self.btn_connect)
self.resize(320, 120)
current_dir = os.path.dirname(os.path.realpath(__file__))
filename = os.path.join(current_dir, "Simple_State_Machine.scxml")
machine = QtScxml.QScxmlStateMachine.fromFile(filename)
machine.setParent(self)
for error in machine.parseErrors():
print(error.toString())
machine.connectToState("unknown", self, QtCore.SLOT("on_unknown_state_enter(bool)"))
machine.connectToState("ready", self, QtCore.SLOT("on_ready_state_enter(bool)"))
machine.connectToState("moving", self, QtCore.SLOT("on_moving_state_enter(bool)"))
self.btn_connect.clicked.connect(partial(machine.submitEvent, "connect"))
self.btn_move.clicked.connect(partial(machine.submitEvent, "move"))
self.client.sig_disconnected.connect(partial(machine.submitEvent, "disconnect"))
self.client.sig_connected.connect(partial(machine.submitEvent, "connect"))
self.client.sig_move_done.connect(partial(machine.submitEvent, "stopped"))
machine.start()
@QtCore.Slot(bool)
def on_unknown_state_enter(self, active):
if active:
print("unknown_state")
self.btn_move.setDisabled(True)
self.btn_connect.setEnabled(True)
@QtCore.Slot(bool)
def on_ready_state_enter(self, active):
if active:
print("ready_state")
self.btn_move.setEnabled(True)
self.btn_connect.setDisabled(True)
@QtCore.Slot(bool)
def on_moving_state_enter(self, active):
if active:
print("moving_state")
self.btn_move.setDisabled(True)
self.btn_connect.setDisabled(True)
dest = np.random.randint(1, 100)
wrapper = partial(self.client.client_move, dest)
QtCore.QTimer.singleShot(0, wrapper)
def closeEvent(self, event):
self._thread.quit()
self._thread.wait()
super().closeEvent(event)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
w = GUI()
w.show()
sys.exit(app.exec_())
Upvotes: 2