Reputation: 101
Objective:
When a new record is inserted into a specific PostgreSQL table, I would like PostgreSQL to notify my node.js web application so that it can initiate an API call to an external service.
I understand the basic steps are:
Here is my attempt at each step:
Trigger function in notify_app_after_table_insert.pgsql
CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
RETURNS TRIGGER AS
$BODY$
BEGIN
PERFORM pg_notify('channel', row_to_json(NEW)::text);
RETURN new;
END;
$BODY$
LANGUAGE plpgsql
Trigger in trigger_notify_app_after_table_insert.sql
CREATE TRIGGER trigger_notify_app_after_table_insert
AFTER INSERT
ON table
FOR EACH ROW
EXECUTE PROCEDURE notify_app_after_table_insert();
Listener mechanism in index.js (inside my web app's backend)
//tools
const express = require('express');
const app = express();
const cors = require('cors');
const bodyParser = require('body-parser');
const port = 3001;
const pool = require('./db'); //stores my postgresql credentials
// Middleware
app.use(cors())
app.use(bodyParser.json())
app.use(bodyParser.urlencoded({extended: true}))
// Apply app.listen notification to console.log
app.listen(port, () => {
console.log(`App running on port ${port}.`)
})
// Apply channel-specific listener mechanism
pool.connect(function(err, client, done) {
if(err) {
console.log(err);
}
client.on('notification', function(msg) {
console.log(msg);
})
client.query("LISTEN channel");
done();
});
Problem:
When the backend web-app server is running and a new record is inserted in the db table, I expect to see a notification message in my web-app's terminal, but nothing appears. I suspect the problem is in the last code block of index.js, but haven't been able to isolate it.
Any suggestions on how to correctly receive the notification in index.js? Thanks in advance.
Upvotes: 10
Views: 9416
Reputation: 15797
I've done no tests, but since Goga Okradze states it works I have no reason to doubt... (apart from order of the calls, I guess the order of the calls are not relevant). Unfortunately the answer is really poor of details and I understand why it is hard to reproduce it.
The problem in OP code seems to be just the last line of code: done();
: it closes the connection, so it also stops listening to events.
I bet that simply removing that call the POC will start working.
An unexpert reader could think: really never close the connection?
Sure! As long as we are interested in receiving events, the connection must be kept open.
Probably we could improve the POC adding a reconnection feature.
const addListener = () => pool.connect(function(err, client, done) {
if(err) {
console.log(err);
// in case of error while connecting (DB down?), retry after 1"
return setTimeout(addListener, 1000).unref();
}
// in case of error, close the client as well
client.on('error', done);
// when client is closed, open a new one
client.on('end', addListener);
// this should be improved to handle a correct server shutdown
// in case of server shutdown,
// probably we want to close the client without opening a new one
client.on('notification', function(msg) {
console.log(msg);
// perform here actual message handling
})
client.query("LISTEN channel");
});
addListener();
Upvotes: 1
Reputation: 31
I had the same problem and I decided to use pg-listen (https://github.com/andywer/pg-listen). Here's my implementation:
PG:
CREATE TABLE active.events(
uid UUID DEFAULT gen_random_uuid(),
created_ts TIMESTAMP DEFAULT NOW(),
consumed_ts TIMESTAMP NULL,
origin VARCHAR(200) NOT NULL,
channel VARCHAR(200) NOT NULL,
type VARCHAR(50) NOT NULL,
path VARCHAR(200) NOT NULL,
payload JSONB NOT NULL,
result JSONB,
CONSTRAINT events_pkey PRIMARY KEY(uid),
CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;
-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
--channel TEXT := TG_ARGV[0];
BEGIN
PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
RETURN NULL;
END;
$tg_notify_events$;
CREATE OR REPLACE TRIGGER notify_events
AFTER INSERT ON active.events
FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();
NODEJS:
const createSubscriber = require('pg-listen');
const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
console.log('Received notification in ' + channel, payload);
});
subscriber.events.on('error', (error) => {
console.error('Fatal database connection error:', error)
process.exit(1)
});
process.on('exit', () => {
subscriber.close()
});
await subscriber.connect();
await subscriber.listenTo(channel);
Hope it helps!
Upvotes: 3
Reputation: 11
I think this is because of order. Write like this:
client.query("LISTEN channel");
client.on('notification', function(msg) {
console.log(msg);
})
For me querying for LISTEN first worked.
Upvotes: 1