如何在Radisson(redis-java客户端)中让一个订阅者处理消息(考虑有多于一个)

问题描述 投票:0回答:2

在我的项目中,我使用 radisson 创建了一个简单的 pub/sub 主题 (https://github.com/redisson/redisson)。发布者将发布一些消息,并且将有多个订阅者在不同的机器上运行。当我发布时,所有订阅者都会收到消息。但我希望订阅者中的任何一个都可以处理该消息,即如果一个人处理该消息,其他人就应该忽略它。丽笙可以吗?

听众:

RTopic topic = redisson.getTopic("topic2");
topic.addListener(Person.class, new MessageListener<Person>() {
    @Override
    public void onMessage(CharSequence charSequence, Person person) {
        System.out.println("PERSON : "+person.toString());
    }
});

出版商:

 Person person = new Person("anyName","female");
 RedissonClient redisson = Redisson.create();
 RTopic topic = redisson.getTopic("topic2");
 topic.publish(person);

是否可以加锁或其他东西,以便只有一个订阅者收听它。还有其他工具支持这种行为吗?

java redis publish-subscribe
2个回答
0
投票

Pub-Sub 不是为此设计的。了解 ActiveMQ/RabbitMQ 或 SQS 中的队列,这将满足您的场景


0
投票

您想要做的事情更符合消息队列,其中最多有一个消费者接收消息。使用 redisson 可以做到这一点,但您将无法使用 pub/sub 模型。

相反,您可以创建一个

RBoundedBlockingQueue
,其中每个消费者等待新元素到达队列。当新元素出现时,等待时间最长的消费者将从队列头部删除该元素,其余消费者将等待。这确保了该消息只能被一个消费者读取。这是通过使用 redis BLPOP 命令来实现的,该命令也由 RedissonBlockingQueues

使用

这是一个例子:

public class MessageQueueRedisson
{
    public static void main(String[] args) throws InterruptedException
    {
        RedissonClient redissonClient = Redisson.create();

        RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
        //optionally set maximum size of queue
        blockingQueue.trySetCapacity(10000);

        //insert 10000 unique elements
        for(int i = 0; i < 10000; ++i) {
            blockingQueue.offer(i + "");
        }

        //thread-safe list that will add all elements polled from redis
        final List<String> copyOnWriteList = new CopyOnWriteArrayList<>();

        final List<Future<?>> results = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for(int i = 0; i < 5; ++i) {
            //create 5 consumers on the consumer-queue
            results.add(executorService.submit(createRunnable(redissonClient, copyOnWriteList)));
        }


        for(Future<?> future: results) {
            while(!future.isDone()) {
                //wait for queue to empty and runnables to finish
                TimeUnit.SECONDS.sleep(5L);
            }
        }

        //Convert final arraylist to set. if list.size() == set.size(), then elements were only polled by one consumer
        Set<String> setResults = new HashSet<>(copyOnWriteList);
        System.out.println(copyOnWriteList.size() == setResults.size());
        System.out.println("list size: " + copyOnWriteList.size() + "set size: " + setResults.size());
    }

    public static Runnable createRunnable(RedissonClient redissonClient, List<String> copyOnWriteList) {
        return () -> {
            //get redis blocking queue
            RBoundedBlockingQueue<String> blockingQueue = redissonClient.getBoundedBlockingQueue("consumer-queue");
            while(!blockingQueue.isEmpty()) {
                //get next element from head of queue
                String nextValue = blockingQueue.poll();
                if(nextValue != null) {
                    copyOnWriteList.add(nextValue);
                }
            }
        };
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.