在我的项目中,我使用 radisson 创建了一个简单的 pub/sub 主题 (https://github.com/redisson/redisson)。发布者将发布一些消息,并且将有多个订阅者在不同的机器上运行。当我发布时,所有订阅者都会收到消息。但我希望订阅者中的任何一个都可以处理该消息,即如果一个人处理该消息,其他人就应该忽略它。丽笙可以吗?
听众:
RTopic topic = redisson.getTopic("topic2");
topic.addListener(Person.class, new MessageListener<Person>() {
@Override
public void onMessage(CharSequence charSequence, Person person) {
System.out.println("PERSON : "+person.toString());
}
});
出版商:
Person person = new Person("anyName","female");
RedissonClient redisson = Redisson.create();
RTopic topic = redisson.getTopic("topic2");
topic.publish(person);
是否可以加锁或其他东西,以便只有一个订阅者收听它。还有其他工具支持这种行为吗?
Pub-Sub 不是为此设计的。了解 ActiveMQ/RabbitMQ 或 SQS 中的队列,这将满足您的场景
您想要做的事情更符合消息队列,其中最多有一个消费者接收消息。使用 redisson 可以做到这一点,但您将无法使用 pub/sub 模型。
相反,您可以创建一个
RBoundedBlockingQueue
,其中每个消费者等待新元素到达队列。当新元素出现时,等待时间最长的消费者将从队列头部删除该元素,其余消费者将等待。这确保了该消息只能被一个消费者读取。这是通过使用 redis BLPOP 命令来实现的,该命令也由 RedissonBlockingQueues 使用
这是一个例子:
public class MessageQueueRedisson
{
public static void main(String[] args) throws InterruptedException
{
RedissonClient redissonClient = Redisson.create();
RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
//optionally set maximum size of queue
blockingQueue.trySetCapacity(10000);
//insert 10000 unique elements
for(int i = 0; i < 10000; ++i) {
blockingQueue.offer(i + "");
}
//thread-safe list that will add all elements polled from redis
final List<String> copyOnWriteList = new CopyOnWriteArrayList<>();
final List<Future<?>> results = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(5);
for(int i = 0; i < 5; ++i) {
//create 5 consumers on the consumer-queue
results.add(executorService.submit(createRunnable(redissonClient, copyOnWriteList)));
}
for(Future<?> future: results) {
while(!future.isDone()) {
//wait for queue to empty and runnables to finish
TimeUnit.SECONDS.sleep(5L);
}
}
//Convert final arraylist to set. if list.size() == set.size(), then elements were only polled by one consumer
Set<String> setResults = new HashSet<>(copyOnWriteList);
System.out.println(copyOnWriteList.size() == setResults.size());
System.out.println("list size: " + copyOnWriteList.size() + "set size: " + setResults.size());
}
public static Runnable createRunnable(RedissonClient redissonClient, List<String> copyOnWriteList) {
return () -> {
//get redis blocking queue
RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
while(!blockingQueue.isEmpty()) {
//get next element from head of queue
String nextValue = blockingQueue.poll();
if(nextValue != null) {
copyOnWriteList.add(nextValue);
}
}
};
}
}