Reputation: 71
I have successfully implemented a Python Qt app based off this very nice tutorial.
I am now writing tests using pytest-qt to specifically test a button that triggers a long running computation that eventually emits a signal when finished. I would like to use waitSignal as documented here.
def test_long_computation(qtbot):
app = Application()
# Watch for the app.worker.finished signal, then start the worker.
with qtbot.waitSignal(app.worker.finished, timeout=10000) as blocker:
blocker.connect(app.worker.failed) # Can add other signals to blocker
app.worker.start()
# Test will block at this point until either the "finished" or the
# "failed" signal is emitted. If 10 seconds passed without a signal,
# TimeoutError will be raised.
assert_application_results(app)
When the button is clicked, this function is executed:
def on_button_click_function(self):
"""
start thread pool to run function
"""
# Pass the function to execute
worker = Worker(self.execute_check_urls)
worker.signals.result.connect(self.print_output) # type: ignore
worker.signals.finished.connect(self.thread_complete) # type: ignore
worker.signals.progress.connect(self.progress_fn) # type: ignore
# Execute
log_info("Starting thread pool worker ...")
self.threadpool.start(worker).
And when the thread completes, a signal is emitted
def thread_complete(self):
main_window = self.find_main_window()
if main_window:
main_window.button_click_signal.emit(
self.results
)
log_info(
f"Emitted signal for button click function: {self.results}"
)
Below is the init function of the main class:
class MyClass:
def __init__(
self,
*args,
**kwargs
):
super(MyClass, self).__init__(*args, **kwargs)
self.threadpool = QThreadPool()
print(
"Multithreading with max %d threads"
% self.threadpool.maxThreadCount()
).
And the worker class which is QRunnable:
def __init__(
self,
fn,
*args,
**kwargs
):
super(Worker, self).__init__()
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()
self.kwargs['progress_callback'] = self.signals.progress
@Slot()
def run(self):
try:
result = self.fn(*self.args, **self.kwargs)
except (Exception):
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit(
(exctype, value, traceback.format_exc())
)
else:
self.signals.result.emit(result)
finally:
self.signals.finished.emit().
I would appreciate some guidance on how to access the worker object from the threadpool. I also tested qtbot.mouseClick() which triggers the function but never emits the signal.
Upvotes: 0
Views: 298