如何正确监听来自node.js的postgresql通知

问题描述 投票:0回答:3

目标:
当一条新记录插入到特定的 PostgreSQL 表中时,我希望 PostgreSQL 通知我的 Node.js Web 应用程序,以便它可以启动对外部服务的 API 调用。

我了解基本步骤是:

  1. 建立一个 PostgreSQL 触发器函数,它将执行 pg_notify() 方法。
  2. 建立一个PostgreSQL触发器,在表插入后执行触发器功能。
  3. 在node.js中建立一个机制来监听特定于通道的PostgreSQL通知。

这是我每一步的尝试:

  1. notify_app_after_table_insert.pgsql中的触发函数

    CREATE OR REPLACE FUNCTION notify_app_after_table_insert()
    RETURNS TRIGGER AS
    $BODY$
        BEGIN
            PERFORM pg_notify('channel', row_to_json(NEW)::text);
            RETURN new;
        END;
    $BODY$
    LANGUAGE plpgsql
    
  2. trigger_notify_app_after_table_insert.sql中触发

    CREATE TRIGGER trigger_notify_app_after_table_insert
    AFTER INSERT
    ON table
    FOR EACH ROW
    EXECUTE PROCEDURE notify_app_after_table_insert();
    
  3. index.js 中的监听器机制(在我的网络应用程序的后端)

    //tools
    const express = require('express');
    const app = express();
    const cors = require('cors');
    const bodyParser = require('body-parser');
    const port = 3001;
    const pool = require('./db'); //stores my postgresql credentials
    
    // Middleware
    app.use(cors())
    app.use(bodyParser.json())
    app.use(bodyParser.urlencoded({extended: true}))
    
    // Apply app.listen notification to console.log
    app.listen(port, () => {
        console.log(`App running on port ${port}.`)
    })
    
    // Apply channel-specific listener mechanism
    pool.connect(function(err, client, done) {
        if(err) {
            console.log(err);
        }
        client.on('notification', function(msg) {
            console.log(msg);
        })
        client.query("LISTEN channel");
        done();
    });
    

问题:
当后端 Web 应用程序服务器正在运行并且新记录插入到数据库表中时,我希望在我的 Web 应用程序的终端中看到一条通知消息,但什么也没有出现。我怀疑问题出在index.js的最后一个代码块中,但无法隔离它。

关于如何在index.js中正确接收通知有什么建议吗?预先感谢。

node.js postgresql notify listen pg-notify
3个回答
2
投票

我遇到了同样的问题,我决定使用 pg-listen (https://github.com/andywer/pg-listen)。这是我的实现:

PG:

CREATE TABLE active.events(
  uid UUID DEFAULT gen_random_uuid(),
  created_ts TIMESTAMP DEFAULT NOW(),
  consumed_ts TIMESTAMP NULL,
  origin VARCHAR(200) NOT NULL,
  channel VARCHAR(200) NOT NULL,
  type VARCHAR(50) NOT NULL,
  path VARCHAR(200) NOT NULL,
  payload JSONB NOT NULL,
  result JSONB,
  CONSTRAINT events_pkey PRIMARY KEY(uid),
  CONSTRAINT events_ukey UNIQUE(uid)
);
CREATE INDEX ON active.events(uid);
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE active.events TO _pg_mb_rl;
ALTER TABLE active.events OWNER TO _pg_mb_rl;

-- TRIGGER
CREATE OR REPLACE FUNCTION active.tg_notify_events()
 RETURNS TRIGGER
 LANGUAGE PLPGSQL
AS $tg_notify_events$
DECLARE
    --channel TEXT := TG_ARGV[0];
BEGIN
    PERFORM pg_notify(NEW.channel, row_to_json(NEW)::TEXT);
    UPDATE active.events SET consumed_ts = NOW() WHERE uid = NEW.uid;
  RETURN NULL;
END;
$tg_notify_events$;

CREATE OR REPLACE TRIGGER notify_events
    AFTER INSERT ON active.events
    FOR EACH ROW EXECUTE PROCEDURE active.tg_notify_events();

节点:

const createSubscriber = require('pg-listen');

const channel = 'message_queue';
const subscriber = createSubscriber({ connectionString: process.env.DATABASE_URL });
subscriber.notifications.on(channel, (payload) => {
  console.log('Received notification in ' + channel, payload);
});

subscriber.events.on('error', (error) => {
  console.error('Fatal database connection error:', error)
  process.exit(1)
});

process.on('exit', () => {
  subscriber.close()
});

await subscriber.connect();
await subscriber.listenTo(channel);

希望有帮助!


1
投票

我认为这是因为秩序。 像这样写:

client.query("LISTEN channel");
client.on('notification', function(msg) {
  console.log(msg);
})

对我来说,首先查询 LISTEN 是有效的。


0
投票

我没有做过任何测试,但由于 Goga Okradze 声明它有效,我没有理由怀疑......(除了调用顺序之外,我猜调用顺序不相关)。不幸的是,答案的细节确实很差,我理解为什么很难重现它。

OP代码中的问题似乎只是最后一行代码:

done();
:它关闭了连接,因此它也停止监听事件。

我敢打赌,只需删除该调用,POC 就会开始工作。

不专业的读者可能会想:真的永远不会关闭连接吗?

当然!只要我们有兴趣接收事件,连接就必须保持打开状态。

也许我们可以改进 POC,添加重新连接功能。

const addListener = () => pool.connect(function(err, client, done) {
  if(err) {
    console.log(err);

    // in case of error while connecting (DB down?), retry after 1"
    return setTimeout(addListener, 1000).unref();
  }

  // in case of error, close the client
  client.on('error', done);

  // when client is closed, open a new one
  client.on('end', addListener);
  // this should be improved to handle a correct server shutdown
  // in case of server shutdown,
  // probably we want to close the client without opening a new one

  client.on('notification', function(msg) {
    console.log(msg);

    // perform here actual message handling
  })

  client.query("LISTEN channel");
});

addListener();
© www.soinside.com 2019 - 2024. All rights reserved.