shashashamti2008
shashashamti2008

Reputation: 2337

Getting false positive trigger signal from libpqxx await_notification()

The code below uses libpqxx C++ library to listen to two trigger signals coming from a TimescaleDB database. There are two infinite loops to wait until a trigger occurs.

while (true){
    conn.await_notification();
    spdlog::info("channel1_updated");
}
while (true){
    conn.await_notification();
    spdlog::info("channel2_updated");
}

The issue is although no trigger is generated, I get false positive messages, and channel1_updated and channel2_updated get printed without any signal generated as if conn.await_notification() did not behave properly. I am expecting it to simply block the code until a signal is received.

#include "pqxx/config-public-compiler.h"
#include <cctype>
#include <cerrno>
#include <cstring>
#include <ctime>
#include <iostream>

#include <pqxx/internal/header-pre.hxx>
#include <pqxx/internal/wait.hxx>
#include <pqxx/internal/header-post.hxx>
#include <pqxx/notification>
#include <pqxx/transaction>
#include <pqxx/transactor>
#include <thread>

#include "spdlog/spdlog.h"
#include "spdlog/cfg/env.h"
#include "spdlog/fmt/ostr.h" 

class PostgresChannelListener final : public pqxx::notification_receiver {
    bool m_done;
public:
    int backend_pid;

    explicit PostgresChannelListener(pqxx::connection &conn, std::string Name) :
            pqxx::notification_receiver(conn, Name), m_done(false) {}

    void operator()(std::string const &, int be_pid) override {
        m_done = true;
        this->backend_pid = be_pid;
        spdlog::info("Received notification: {} pid=", channel(), be_pid);
    }

    bool done() const { return m_done; }
};

int waitForChannelUpdated(const std::string& channel){
    try {
        pqxx::connection conn("dbname=postgres user=postgres password=password hostaddr=127.0.0.1 port=5432");
        if (conn.is_open()) {
            spdlog::info("Opened database successfully: {}", conn.dbname());
            spdlog::info("PostgreSQL server version: {}", conn.server_version());
        } else {
            spdlog::error("Failed to open database");
            return 1;
        }

        PostgresChannelListener L{conn, channel};

        pqxx::perform([&conn, &L] {
            pqxx::work tx{conn};
            tx.exec0("NOTIFY " + tx.quote_name(L.channel()));
            tx.commit();
        });

        while (true) {
            conn.await_notification();
            spdlog::info(channel);
        }

        conn.close();
        spdlog::info("Closed database successfully");

    } catch (const std::exception &e) {
        spdlog::error("Error: {}", e.what());
        return 1;
    }

    return 0;
}


int main() {

    std::thread threadChannel1([](){
        waitForChannelUpdated("channel1_updated");
    });

    std::thread threadChannel2([](){
        waitForChannelUpdated("channel2_updated");
    });

    threadChannel1.join();
    threadChannel2.join();

    return 0;
}

Upvotes: 1

Views: 88

Answers (0)

Related Questions