我刚刚开始接触 redis、lettuce 和异步编码。现在遗憾的是,我找不到任何有关如何将消息从侦听器获取到我的程序中的示例。 javadoc 或我找到的有关这些函数的任何其他信息也没有多大帮助。那么有人可以解释如何将已发布的消息放入字符串中吗?
我目前的代码如下所示:
RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {@Override methods to be implemented???}
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
我很确定我必须实现侦听器的消息方法,但我不知道是否要从那里开始。我知道参数代表什么...但是这些方法的返回值为 void,因此它们也不会向我输出任何消息。
那么,从哪里开始呢? (完全困惑)
你开了个好头。 Redis Pub/Sub 至少涉及两方:
订阅者(我猜这并不奇怪)订阅频道、模式或两者。
发布者将消息发布到频道。此设置也需要反映在您的代码中。
我使用
RedisPubSubAdapter
稍微扩展了您的代码,因此代码不需要实现所有方法,只需实现我们感兴趣的方法,例如 message(channel, message)
:
RedisClient client = RedisClient.create("redis://" + host + "/0");
StatefulRedisPubSubConnection<String, String> con = client.connectPubSub();
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
}
};
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
添加监听器并且客户端订阅频道后,就可以接收 Pub/Sub 消息了。当通知到达时,lettuce 将调用侦听器的方法。此时,重要的是要了解通知是在 I/O 线程上处理的,该线程与设置客户端和订阅的线程不同。
让我们来到发送方。要将消息发送到您的频道,您需要额外的连接(或者使用
redis-cli
并发出 PUBLISH channel message
)。
StatefulRedisConnection<String, String> sender = client.connect();
sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");
Redis 将在名为
Message 1
的通道上发布消息 Message 1
和 channel
(不是一个创意名称,但它现在可以完成这项工作)。
如果您连续执行代码并在发送消息后稍等一下,那么您很有可能会通知侦听器并且您会看到一些系统输出,例如:
Channel: channel, Message: Message 1
Channel: channel, Message: Message 2
现在是异步的棘手部分。使用异步通信在某些情况下是有益的,但会增加复杂性。如果您可以在结果到达之前进行工作(进行一些计算,直到您需要结果),或者您只想启动 I/O 并释放您正在处理的线程。服务器应用程序是异步模式的良好环境。典型的服务器具有有限的线程资源,并且它会一直运行直到关闭。在服务器启动时,您将注册一个订阅。一旦消息进来,它就会在 I/O 线程上进行处理,并且您的侦听器将被调用
当在独立应用程序中使用异步命令执行时(例如简单的
main
),那么您将拥有一个顺序流。一旦代码流完成,异步消息传递将导致您的程序退出。这并不一定意味着已接收或处理了 Pub/Sub 消息。如果您在 main
中一一运行这两个代码块,您很可能根本看不到任何输出,因为程序终止的速度比 I/O 发生的速度快。现在同步开始发挥作用。如何处理同步有无数种可能性,但现在让我们看一下两种替代方案:
CountDownLatch
:在释放程序流程之前需要发生许多事情Thread.sleep(…)
:等待几毫秒CountDownLatch使用
final CountDownLatch latch = new CountDownLatch(2);
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
latch.countDown();
}
};
// ...
sender.sync().publish("channel", "Message 2");
latch.await();
在上面的代码中,
CountDownLatch
已准备好倒计时两次 (latch.countDown()
)。调用 latch.await()
会阻塞主线程(程序流程)并使其等待,直到 CountDownLatch
倒计时,从而释放程序以继续。
Thread.sleep(…)使用
RedisPubSubListener<String, String> listener = new RedisPubSubAdapter<String, String>() {
@Override
public void message(String channel, String message) {
System.out.println(String.format("Channel: %s, Message: %s", channel, message));
}
};
con.addListener(listener);
RedisPubSubCommands<String, String> sync = con.sync();
sync.subscribe("channel");
StatefulRedisConnection<String, String> sender = client.connect();
sender.sync().publish("channel", "Message 1");
sender.sync().publish("channel", "Message 2");
latch.await();
Thread.sleep(1000);
此代码使用
Thread.sleep(1000);
等待一秒钟(在主线程上)。这通常足以接收消息。 不要这样做。这种方法既快速又肮脏,可能适合播放和调试,但在合理的代码中避免Thread.sleep
。
Redis对订阅通道/模式的连接施加限制:一旦订阅,您只能执行
SUBSCRIBE
、PSUBSCRIBE
、UNSUBSCRIBE
、PUNSUBSCRIBE
、PING
和 QUIT
命令。不允许在该连接上执行 PUBLISH
。因此,您需要使用额外的连接。
我想知道订阅者是否是一个集群,例如如果有多个服务器正在侦听,如何避免多个订阅者处理同一消息。 Kafka有组的概念,Redis里有类似的吗?