Reputation: 2337
The code below uses libpqxx
C++ library to listen to two trigger signals coming from a TimescaleDB database. There are two infinite loops to wait until a trigger occurs.
while (true){
conn.await_notification();
spdlog::info("channel1_updated");
}
while (true){
conn.await_notification();
spdlog::info("channel2_updated");
}
The issue is although no trigger is generated, I get false positive messages, and channel1_updated
and channel2_updated
get printed without any signal generated as if conn.await_notification()
did not behave properly. I am expecting it to simply block the code until a signal is received.
#include "pqxx/config-public-compiler.h"
#include <cctype>
#include <cerrno>
#include <cstring>
#include <ctime>
#include <iostream>
#include <pqxx/internal/header-pre.hxx>
#include <pqxx/internal/wait.hxx>
#include <pqxx/internal/header-post.hxx>
#include <pqxx/notification>
#include <pqxx/transaction>
#include <pqxx/transactor>
#include <thread>
#include "spdlog/spdlog.h"
#include "spdlog/cfg/env.h"
#include "spdlog/fmt/ostr.h"
class PostgresChannelListener final : public pqxx::notification_receiver {
bool m_done;
public:
int backend_pid;
explicit PostgresChannelListener(pqxx::connection &conn, std::string Name) :
pqxx::notification_receiver(conn, Name), m_done(false) {}
void operator()(std::string const &, int be_pid) override {
m_done = true;
this->backend_pid = be_pid;
spdlog::info("Received notification: {} pid=", channel(), be_pid);
}
bool done() const { return m_done; }
};
int waitForChannelUpdated(const std::string& channel){
try {
pqxx::connection conn("dbname=postgres user=postgres password=password hostaddr=127.0.0.1 port=5432");
if (conn.is_open()) {
spdlog::info("Opened database successfully: {}", conn.dbname());
spdlog::info("PostgreSQL server version: {}", conn.server_version());
} else {
spdlog::error("Failed to open database");
return 1;
}
PostgresChannelListener L{conn, channel};
pqxx::perform([&conn, &L] {
pqxx::work tx{conn};
tx.exec0("NOTIFY " + tx.quote_name(L.channel()));
tx.commit();
});
while (true) {
conn.await_notification();
spdlog::info(channel);
}
conn.close();
spdlog::info("Closed database successfully");
} catch (const std::exception &e) {
spdlog::error("Error: {}", e.what());
return 1;
}
return 0;
}
int main() {
std::thread threadChannel1([](){
waitForChannelUpdated("channel1_updated");
});
std::thread threadChannel2([](){
waitForChannelUpdated("channel2_updated");
});
threadChannel1.join();
threadChannel2.join();
return 0;
}
Upvotes: 1
Views: 88