Jeroen De Geeter
Jeroen De Geeter

Reputation: 19

How to efficiently keep a stable frame rate video stream in a PyQt application?

I am developing a PyQt (PySide6) application that needs to display and store multiple camera streams at the same time. The display of the camera streams goes well; however, storing these streams seems to slow down the application significantly up to a point where the GUI doesn't work fluently anymore.

I have a minimal working example using a stub to demonstrate how my code currently works. However, given that it is a minimal working example, it will not visibly slow down.

import sys
from time import sleep

import av
import numpy as np
import pyqtgraph as pg
from PySide6.QtCore import QThread, Signal, Slot, Qt
from PySide6.QtWidgets import QApplication, QHBoxLayout, QWidget, QVBoxLayout, QPushButton, QGroupBox


class RGBCameraStub(QThread):

    newFrame = Signal(np.ndarray)

    def __init__(self):
        super().__init__()
        self.killSwitch = True

    def stop(self):
        self.killSwitch = False
        self.quit()
        self.wait()

    def run(self):
        self.killSwitch = True
        while self.killSwitch:
            self.newFrame.emit((np.random.rand(1456, 1080, 3) * 255).astype(np.uint8))
            sleep((20 + int(np.random.rand() * 30))/ 1000)


class VideoWriter(QThread):

    def __init__(self):
        super().__init__()
        self.output_container = av.open('output_video.mkv', mode='w')
        self.stream = self.output_container.add_stream('ffv1', rate=None)
        self.stream.width = 1456
        self.stream.height = 1080
        self.stream.pix_fmt = 'yuv420p'

    @Slot(np.ndarray)
    def addFrame(self, frame: np.ndarray):
        av_frame = av.VideoFrame.from_ndarray(frame, format='rgb24')
        av_frame.pts = None # Leave emtpy for auto-handling - variable framerate?
        for packet in self.stream.encode(av_frame):
            self.output_container.mux(packet)

    def stop(self):
        self.output_container.close()
        self.quit()
        self.wait()

    def run(self):
        self.exec()


class VideoBox(QGroupBox):

    def __init__(self, title):
        super().__init__(title=title)
        self.createLayout()
        self.videoWidget.setImage((np.random.rand(1456, 1080, 3) * 255).astype(np.uint8))

    def createLayout(self):
        layout = QVBoxLayout()
        self.videoWidget = pg.RawImageWidget()
        layout.addWidget(self.videoWidget)
        self.setLayout(layout)
        self.setStyleSheet("""QGroupBox {
            border: 1px solid #494B4F;
            margin-top: 8px;
            min-width: 180px;
            min-height: 180px;
            padding: 2px 0px 0px 0px;
            }
        QGroupBox::title {
            color: #aeb0b8;
            subcontrol-origin: margin;
            subcontrol-position: top left;
            left: 20px;
            padding: 0 8px;
        }""")

    def setImage(self, data: np.ndarray):
        self.videoWidget.setImage(data)

class MainWindow(QWidget):

    closeSignal = Signal()

    def __init__(self):
        super().__init__()
        self.setGeometry(0, 0, 900, 720)
        self.createLayout()

    def createLayout(self):
        self.vimbaImage = VideoBox("RGB")
        self.info = self.infoLayout()

        layout = QVBoxLayout()
        layout.addWidget(self.vimbaImage)
        layout.addWidget(self.info)
        self.setLayout(layout)

        self.setAttribute(Qt.WA_StyledBackground, True)
        self.setStyleSheet("MainWindow { background-color: #1e1f22; }")

    def infoLayout(self):
        widget = QWidget()
        layout = QVBoxLayout()

        rgbButtonWidget = QWidget()
        buttonLayout = QHBoxLayout()
        self.connectButton = QPushButton('Connect', parent=self)
        self.disconnectButton = QPushButton('Disconnect', parent=self)
        buttonLayout.addWidget(self.connectButton)
        buttonLayout.addWidget(self.disconnectButton)
        buttonLayout.addStretch()
        rgbButtonWidget.setLayout(buttonLayout)
        layout.addWidget(rgbButtonWidget)

        widget.setLayout(layout)
        return widget

    def closeEvent(self, event):
        self.closeSignal.emit()
        event.accept()



if __name__ == "__main__":
    app = QApplication(sys.argv)

    rgbCamera = RGBCameraStub()
    videoWriter = VideoWriter()
    videoWriter.start()

    main_window = MainWindow()

    # Button connections
    main_window.connectButton.clicked.connect(rgbCamera.start)
    main_window.disconnectButton.clicked.connect(rgbCamera.stop)
    # main_window.disconnectButton.clicked.connect(videoWriter.stop)

    # Display frames
    rgbCamera.newFrame.connect(main_window.vimbaImage.setImage)

    # Write frame to file
    rgbCamera.newFrame.connect(videoWriter.addFrame)

    # Close application
    main_window.closeSignal.connect(rgbCamera.stop)
    main_window.closeSignal.connect(videoWriter.stop)

    main_window.show()
    sys.exit(app.exec())

My question(s) therefore are:

As a side note, I currently use the PyAV wrapper for the FFmpeg library, however I am open to other suggestions.

Upvotes: 1

Views: 124

Answers (0)

Related Questions