Karim Dhrif
Karim Dhrif

Reputation: 91

Multithreaded application with timed recording pipeline in GStreamer

I am trying to build a pipeline that can record for a specific duration (like 5 seconds) and then stop the processes and clean up elegantly. To do that I’m setting a timer in a separate thread, which sends an EOS to the pipeline when it ends:

 GstState current, pending;
    gst_element_get_state(pipeline, &current, &pending, GST_CLOCK_TIME_NONE);
    if (current != GST_STATE_PLAYING) {
      std::cerr << "Pipeline is not in PLAYING state, current state: "
                << gst_element_state_get_name(current) << std::endl;
    }
    gst_element_send_event(pipeline, gst_event_new_eos());
    std::cout << "EOS sent" << std::endl;
    // Wait for the EOS message to be received
    {
      std::unique_lock<std::mutex> lock(stop_mutex);
      stop_cv.wait(lock, [this] { return eos_received; });
      std::cout << "eos received" << std::endl;
    }
    // we can quit the main loop
    g_main_loop_quit(loop);
    g_main_loop_unref(loop);
    running = false;
  }
}

The problem I have is that I never seems to get an EOS message on the bus :

gboolean AutoRecorderSink::message_cb(GstBus *bus, GstMessage *message,
                                      gpointer user_data) {
  AutoRecorderSink *self = static_cast<AutoRecorderSink *>(user_data);
  switch (GST_MESSAGE_TYPE(message)) {
  case GST_MESSAGE_ERROR:
    GError *err;
    gchar *debug_info;
    gst_message_parse_error(message, &err, &debug_info);
    std::cerr << "Error received from element " << GST_OBJECT_NAME(message->src)
              << ": " << err->message << std::endl;
    std::cerr << "Debugging information: " << (debug_info ? debug_info : "none")
              << std::endl;
    g_clear_error(&err);
    g_free(debug_info);
    self->stop();
    break;
  case GST_MESSAGE_EOS:
    std::cout << "EOS Received - Stopping Recording.\n";
    {
      std::lock_guard<std::mutex> lock(self->stop_mutex);
      self->eos_received = true;
    }
    self->stop_cv.notify_one(); // continue with the stop() function
    break;
  default:
    break;
  }
  return TRUE;
}

Specifically I never get the message “EOS Received - Stopping Recording”.

Solutions I tried: I tried setting the message-forward property on the pipeline so that the event is propagated to every element but that didn’t help.

I also tried to run the stop function within a g_idle_add function so that it runs in the main thread but that didn’t help. Here’s the code for the g_idle_add :

gboolean AutoRecorderSink::stop_pipeline_idle(gpointer user_data) {
    AutoRecorderSink *recorder = static_cast<AutoRecorderSink *>(user_data);
    recorder->perform_stop();
    return G_SOURCE_REMOVE; // Remove the idle function after execution
}

void AutoRecorderSink::stop() {
    std::cout << "Stop called, checking running variable" << std::endl;
    if (running) {
        // Queue the stop logic to run in the main loop thread
        g_idle_add(stop_pipeline_idle, this);
    }
}

void AutoRecorderSink::perform_stop() {
    std::cout << "Stopping the pipeline" << std::endl;

    GstState current, pending;
    gst_element_get_state(pipeline, &current, &pending, GST_CLOCK_TIME_NONE);
    if (current != GST_STATE_PLAYING && current != GST_STATE_PAUSED) {
        std::cerr << "Pipeline is not in PLAYING or PAUSED state, current state: "
                  << gst_element_state_get_name(current) << std::endl;
    }

    // Send EOS to the pipeline
    if (!gst_element_send_event(pipeline, gst_event_new_eos())) {
        std::cerr << "Failed to send EOS event" << std::endl;
        return;
    }
    std::cout << "EOS sent" << std::endl;

    // Wait for the EOS message to be received on the bus
    {
        std::unique_lock<std::mutex> lock(stop_mutex);
        stop_cv.wait(lock, [this] { return eos_received; });
        std::cout << "EOS received" << std::endl;
    }

    // Clean up GMainLoop
    if (loop) {
        g_main_loop_quit(loop);
        g_main_loop_unref(loop);
        loop = nullptr;
    }
    running = false;
}

Note: It seems that the EOS is somehow registered as the recording I get seems to be of the duration I sent as parameter. However it never get to the bus for some reason. I am using GStreamer 1.16

Upvotes: 0

Views: 54

Answers (1)

Karim Dhrif
Karim Dhrif

Reputation: 91

Fixed my problem by registering the bus with

bus_watch_id = gst_bus_add_watch(bus, (GstBusFunc)message_cb, this); 

Rather than :

 g_signal_connect(G_OBJECT(bus), "message", G_CALLBACK(message_cb), this);

Upvotes: 0

Related Questions