节点 Redis XREAD 屏蔽订阅

问题描述 投票:0回答:1

我正在将一个redis pubsub系统转换为redis流,这样我就可以为我的服务器发送的事件增加一些容错性。

用传统的方式订阅是微不足道的。

import { createClient } from 'redis';

const redisOptions = {
  url: `${process.env.REDIS_URL}/0`
}
const redis = createClient(redisOptions);
redis.setMaxListeners(100000);

redis.on("message", (channel, message) => {
  console.log(channel);
  console.log(message);
});

redis.subscribe('foo');

这将永久地阻止,并保持连接的开放性. 在这种情况下,发布到redis会添加到你的日志中。

const json = { a: 1, b: 2 };

redis.publish('foo', JSON.stringify(json));

转换到流,你可以使用 XREAD 而不是订阅,以及 XADD 而不是发布,数据有很大的不同。我苦恼的部分是屏蔽。

redis.xread('BLOCK', 0, 'STREAMS', 'foo', '$', (err, str) => {
  if (err) return console.error('Error reading from stream:', err);

  str.forEach(message => {
    console.log(message);
  });
}

当发送消息时,第一个消息被我的 "订阅 "接收,但没有进一步的消息被记录。

node.js redis publish-subscribe redis-streams
1个回答
1
投票

说实话,我之所以问这个问题,只是因为google对我不好,我也找不到其他人发过这个问题。希望能帮到你!

所以。XREAD 仅在首次呼叫时阻断。它将在设定的时间段内坐等(如果您将时间设置为0,则为无限期),但一旦它接收到数据,它的职责就被认为是完成了,它就会解锁。为了保持 "订阅 "的有效性,你需要调用 XREAD 再次使用流中最近的id。这将取代初始的 $ 值,我们传递给它。

递归似乎是一个完美的解决方案。

const xread = ({ stream, id }) => {
  redis.xread('BLOCK', 0, 'STREAMS', stream, id, (err, str) => {
    if (err) return console.error('Error reading from stream:', err);

    str[0][1].forEach(message => {
      id = message[0];
      console.log(id);
      console.log(message[1]);
    });

    xread({ stream, id })
  });
}

xread({ stream: 'asdf', id: '$' })

© www.soinside.com 2019 - 2024. All rights reserved.