Bastien Mazeran
Bastien Mazeran

Reputation: 71

pytest-qt waitSignal for long running computation running in a thread pool

I have successfully implemented a Python Qt app based off this very nice tutorial.

I am now writing tests using pytest-qt to specifically test a button that triggers a long running computation that eventually emits a signal when finished. I would like to use waitSignal as documented here.

def test_long_computation(qtbot):
app = Application()

# Watch for the app.worker.finished signal, then start the worker.
with qtbot.waitSignal(app.worker.finished, timeout=10000) as blocker:
    blocker.connect(app.worker.failed)  # Can add other signals to blocker
    app.worker.start()
    # Test will block at this point until either the "finished" or the
    # "failed" signal is emitted. If 10 seconds passed without a signal,
    # TimeoutError will be raised.

assert_application_results(app)    

When the button is clicked, this function is executed:

def on_button_click_function(self):
    """
    start thread pool to run function
    """
    # Pass the function to execute
    worker = Worker(self.execute_check_urls)
    worker.signals.result.connect(self.print_output)  # type: ignore
    worker.signals.finished.connect(self.thread_complete)  # type: ignore
    worker.signals.progress.connect(self.progress_fn)  # type: ignore
    # Execute
    log_info("Starting thread pool worker ...")
    self.threadpool.start(worker).

And when the thread completes, a signal is emitted

def thread_complete(self):
    main_window = self.find_main_window()
    if main_window:
        main_window.button_click_signal.emit(
            self.results
        )
        log_info(
            f"Emitted signal for button click function: {self.results}"
        )

Below is the init function of the main class:

class MyClass:

def __init__(
    self,
    *args,
    **kwargs
):
    super(MyClass, self).__init__(*args, **kwargs)
    self.threadpool = QThreadPool()
    print(
        "Multithreading with max %d threads"
        % self.threadpool.maxThreadCount()
    ).   

And the worker class which is QRunnable:

def __init__(
    self,
    fn,
    *args,
    **kwargs
):
    super(Worker, self).__init__()
    self.fn = fn
    self.args = args
    self.kwargs = kwargs
    self.signals = WorkerSignals()
    self.kwargs['progress_callback'] = self.signals.progress

@Slot()
def run(self):
    try:
        result = self.fn(*self.args, **self.kwargs)
    except (Exception):
        traceback.print_exc()
        exctype, value = sys.exc_info()[:2]
        self.signals.error.emit(
            (exctype, value, traceback.format_exc())
        )
    else:
        self.signals.result.emit(result)
    finally:
        self.signals.finished.emit().   

I would appreciate some guidance on how to access the worker object from the threadpool. I also tested qtbot.mouseClick() which triggers the function but never emits the signal.

Upvotes: 0

Views: 298

Answers (0)

Related Questions