Java中如何从生菜RedisPubSubListener获取消息?

问题描述 投票:0回答:2

我刚刚开始接触 redis、lettuce 和异步编码。现在遗憾的是,我找不到任何有关如何将消息从侦听器获取到我的程序中的示例。 javadoc 或我找到的有关这些函数的任何其他信息也没有多大帮助。那么有人可以解释如何将已发布的消息放入字符串中吗?

我目前的代码如下所示:

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

我很确定我必须实现侦听器的消息方法,但我不知道是否要从那里开始。我知道参数代表什么...但是这些方法的返回值为 void,因此它们也不会向我输出任何消息。

那么,从哪里开始呢? (完全困惑)

java redis publish-subscribe
2个回答
17
投票

你开了个好头。 Redis Pub/Sub 至少涉及两方:

  1. 订户
  2. 和出版商

订阅者(我猜这并不奇怪)订阅频道、模式或两者。

发布者将消息发布到频道。此设置也需要反映在您的代码中。

我使用

RedisPubSubAdapter
稍微扩展了您的代码,因此代码不需要实现所有方法,只需实现我们感兴趣的方法,例如
message(channel, message)
:

RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

添加监听器并且客户端订阅频道后,就可以接收 Pub/Sub 消息了。当通知到达时,lettuce 将调用侦听器的方法。此时,重要的是要了解通知是在 I/O 线程上处理的,该线程与设置客户端和订阅的线程不同。

让我们来到发送方。要将消息发送到您的频道,您需要额外的连接(或者使用

redis-cli
并发出
PUBLISH channel message
)。

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");

Redis 将在名为

Message 1
的通道上发布消息
Message 1
channel
(不是一个创意名称,但它现在可以完成这项工作)。

如果您连续执行代码并在发送消息后稍等一下,那么您很有可能会通知侦听器并且您会看到一些系统输出,例如:

Channel: channel, Message: Message 1
Channel: channel, Message: Message 2

异步:效果如何?

现在是异步的棘手部分。使用异步通信在某些情况下是有益的,但会增加复杂性。如果您可以在结果到达之前进行工作(进行一些计算,直到您需要结果),或者您只想启动 I/O 并释放您正在处理的线程。服务器应用程序是异步模式的良好环境。典型的服务器具有有限的线程资源,并且它会一直运行直到关闭。在服务器启动时,您将注册一个订阅。一旦消息进来,它就会在 I/O 线程上进行处理,并且您的侦听器将被调用

当在独立应用程序中使用异步命令执行时(例如简单的

main
),那么您将拥有一个顺序流。一旦代码流完成,异步消息传递将导致您的程序退出。这并不一定意味着已接收或处理了 Pub/Sub 消息。如果您在
main
中一一运行这两个代码块,您很可能根本看不到任何输出,因为程序终止的速度比 I/O 发生的速度快。现在同步开始发挥作用。如何处理同步有无数种可能性,但现在让我们看一下两种替代方案:

  1. CountDownLatch
    :在释放程序流程之前需要发生许多事情
  2. Thread.sleep(…)
    :等待几毫秒

CountDownLatch使用

final CountDownLatch latch = new CountDownLatch(2);
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
        latch.countDown();
    }
};

// ...
sender.sync().publish("channel", "Message 2");

latch.await();

在上面的代码中,

CountDownLatch
已准备好倒计时两次 (
latch.countDown()
)。调用
latch.await()
会阻塞主线程(程序流程)并使其等待,直到
CountDownLatch
倒计时,从而释放程序以继续。

Thread.sleep(…)使用

RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {

    @Override
    public void message(String channel, String message) {
        System.out.println(String.format("Channel: %s, Message: %s", channel, message));
    }
};

con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");

StatefulRedisConnection<String, String> sender = client.connect();

sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");

latch.await();

Thread.sleep(1000);

此代码使用

Thread.sleep(1000);
等待一秒钟(在主线程上)。这通常足以接收消息。 不要这样做。这种方法既快速又肮脏,可能适合播放和调试,但在合理的代码中避免
Thread.sleep

两个参与者的事情

Redis对订阅通道/模式的连接施加限制:一旦订阅,您只能执行

SUBSCRIBE
PSUBSCRIBE
UNSUBSCRIBE
PUNSUBSCRIBE
PING
QUIT
命令。不允许在该连接上执行
PUBLISH
。因此,您需要使用额外的连接。


0
投票

我想知道订阅者是否是一个集群,例如如果有多个服务器正在侦听,如何避免多个订阅者处理同一消息。 Kafka有组的概念,Redis里有类似的吗?

© www.soinside.com 2019 - 2024. All rights reserved.