目标:
当一条新记录插入到特定的 PostgreSQL 表中时,我希望 PostgreSQL 通知我的 Node.js Web 应用程序,以便它可以启动对外部服务的 API 调用。
我了解基本步骤是:
这是我每一步的尝试:
notify_app_after_table_insert.pgsql中的触发函数
CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
RETURNS TRIGGER AS
$BODY$
BEGIN
PERFORM pg_notify('channel', row_to_json(NEW)::text);
RETURN new;
END;
$BODY$
LANGUAGE plpgsql
trigger_notify_app_after_table_insert.sql中触发
CREATE TRIGGER trigger_notify_app_after_table_insert
AFTER INSERT
ON table
FOR EACH ROW
EXECUTE PROCEDURE notify_app_after_table_insert();
index.js 中的监听器机制(在我的网络应用程序的后端)
//tools
const express = require('express');
const app = express();
const cors = require('cors');
const bodyParser = require('body-parser');
const port = 3001;
const pool = require('./db'); //stores my postgresql credentials
// Middleware
app.use(cors())
app.use(bodyParser.json())
app.use(bodyParser.urlencoded({extended: true}))
// Apply app.listen notification to console.log
app.listen(port, () => {
console.log(`App running on port ${port}.`)
})
// Apply channel-specific listener mechanism
pool.connect(function(err, client, done) {
if(err) {
console.log(err);
}
client.on('notification', function(msg) {
console.log(msg);
})
client.query("LISTEN channel");
done();
});
问题:
当后端 Web 应用程序服务器正在运行并且新记录插入到数据库表中时,我希望在我的 Web 应用程序的终端中看到一条通知消息,但什么也没有出现。我怀疑问题出在index.js的最后一个代码块中,但无法隔离它。
关于如何在index.js中正确接收通知有什么建议吗?预先感谢。
我遇到了同样的问题,我决定使用 pg-listen (https://github.com/andywer/pg-listen)。这是我的实现:
PG:
CREATE TABLE active.events(
uid UUID DEFAULT gen_random_uuid(),
created_ts TIMESTAMP DEFAULT NOW(),
consumed_ts TIMESTAMP NULL,
origin VARCHAR(200) NOT NULL,
channel VARCHAR(200) NOT NULL,
type VARCHAR(50) NOT NULL,
path VARCHAR(200) NOT NULL,
payload JSONB NOT NULL,
result JSONB,
CONSTRAINT events_pkey PRIMARY KEY(uid),
CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;
-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
--channel TEXT := TG_ARGV[0];
BEGIN
PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
RETURN NULL;
END;
$tg_notify_events$;
CREATE OR REPLACE TRIGGER notify_events
AFTER INSERT ON active.events
FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();
节点:
const createSubscriber = require('pg-listen');
const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
console.log('Received notification in ' + channel, payload);
});
subscriber.events.on('error', (error) => {
console.error('Fatal database connection error:', error)
process.exit(1)
});
process.on('exit', () => {
subscriber.close()
});
await subscriber.connect();
await subscriber.listenTo(channel);
希望有帮助!
我认为这是因为秩序。 像这样写:
client.query("LISTEN channel");
client.on('notification', function(msg) {
console.log(msg);
})
对我来说,首先查询 LISTEN 是有效的。
我没有做过任何测试,但由于 Goga Okradze 声明它有效,我没有理由怀疑......(除了调用顺序之外,我猜调用顺序不相关)。不幸的是,答案的细节确实很差,我理解为什么很难重现它。
OP代码中的问题似乎只是最后一行代码:
done();
:它关闭了连接,因此它也停止监听事件。
我敢打赌,只需删除该调用,POC 就会开始工作。
不专业的读者可能会想:真的永远不会关闭连接吗?
当然!只要我们有兴趣接收事件,连接就必须保持打开状态。
也许我们可以改进 POC,添加重新连接功能。
const addListener = () => pool.connect(function(err, client, done) {
if(err) {
console.log(err);
// in case of error while connecting (DB down?), retry after 1"
return setTimeout(addListener, 1000).unref();
}
// in case of error, close the client
client.on('error', done);
// when client is closed, open a new one
client.on('end', addListener);
// this should be improved to handle a correct server shutdown
// in case of server shutdown,
// probably we want to close the client without opening a new one
client.on('notification', function(msg) {
console.log(msg);
// perform here actual message handling
})
client.query("LISTEN channel");
});
addListener();