为什么调用 Java DelayQueue 的 take() 方法不会阻塞所有线程的整个数据结构?

问题描述 投票:0回答:1

我试图弄清楚

java.util.concurrent.DelayQueue
在多线程环境中是如何工作的。我看到这个数据结构内部使用了
ReentrantLock
,并且它是在此类的每个方法的开头获取的。我尝试使用不同的方法来实现生产者-消费者模式,从队列中检索元素的顶部 (
poll()
/
take()
)。

public class DelayQueueTest {
    static DelayQueue<Delayed> delayQueue = new DelayQueue<>();
    public static final long executionTime = 2_000L;
    public static long timeStart;

    public static void go() throws InterruptedException {
        timeStart = System.currentTimeMillis();
        Thread consumer1 = new Thread(new Consumer());
        Thread producer = new Thread(new Producer());


        consumer1.start();
        Thread.sleep(200);
        producer.start();

        producer.join();
        consumer1.join();
    }

    static class Consumer implements Runnable {
        static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;

        @Override
        public void run() {
            while (System.currentTimeMillis() - timeStart < executionTime) {
                System.out.println(String.format(
                        "Thread %s started taking at %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart)
                );
                Delayed result;
                try {
                    result = delayQueue.take();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                System.out.print(String.format(
                        "Thread %s finished taking at %d... ",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart)
                );

                if (result != null) System.out.println(String.format("with result %s", result.toString()));
                else System.out.println();
                System.out.format("Queue size: %d\n", delayQueue.size());
                try {
                    Thread.sleep(500);
                }
                catch (InterruptedException ex) {
                    System.out.println(ex);
                }
            }
        }
    }

    static class Producer implements Runnable {
        static DelayQueue<Delayed> delayQueue = DelayQueueTest.delayQueue;

        @Override
        public void run() {
            while (System.currentTimeMillis() - timeStart < executionTime) {
                System.out.println(String.format(
                        "Thread %s started offering at %d. Queue size: %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart,
                        delayQueue.size())
                );
                delayQueue.offer(new DelayObject(300));
                System.out.println(String.format(
                        "Thread %s finished offering at %d. Queue size: %d",
                        Thread.currentThread().getId(),
                        System.currentTimeMillis() - timeStart,
                        delayQueue.size())
                );
                try {
                    Thread.sleep(300);
                }
                catch (InterruptedException ex) {
                    System.out.println(ex);
                }
            }
        }
    }
}

当我在

消费者线程
上调用take(),并在200毫秒后在
生产者线程
上调用offer()时,问题出现了。我期望消费者线程在
take()
获取锁时阻塞整个数据结构,因此
offer()
不会向队列中添加元素(因为
offer()
也想获取锁,但它被
offer() 锁定) 
)。所以,从全球范围来看,我预计会出现僵局。然而,这并没有发生。这是我的程序的输出:

Thread 15 started taking at 2
Thread 16 started offering at 214. Queue size: 0
Thread 16 finished offering at 217. Queue size: 1
Thread 16 started offering at 527. Queue size: 1
Thread 16 finished offering at 527. Queue size: 1
Thread 15 finished taking at 527... with result org.example.DelayQueueTest$DelayObject@7148b5a8
Queue size: 1
Thread 16 started offering at 828. Queue size: 1
Thread 16 finished offering at 828. Queue size: 2
Thread 15 started taking at 1035
Thread 15 finished taking at 1035... with result org.example.DelayQueueTest$DelayObject@21819116
Queue size: 1
Thread 16 started offering at 1143. Queue size: 1
Thread 16 finished offering at 1143. Queue size: 2
Thread 16 started offering at 1453. Queue size: 2
Thread 16 finished offering at 1453. Queue size: 3
Thread 15 started taking at 1548
Thread 15 finished taking at 1548... with result org.example.DelayQueueTest$DelayObject@6bf427cb
Queue size: 2
Thread 16 started offering at 1769. Queue size: 2
Thread 16 finished offering at 1769. Queue size: 3

你能解释一下,为什么我有这样的输出吗?为什么要加锁,而且已经加锁了,

offer()
却执行成功了?

此外,如果您可能想出这个

DelayQueue
类的潜在用例,我很乐意观察它们......

java multithreading priority-queue java.util.concurrent delayed-execution
1个回答
0
投票

正如您正确地注意到的那样,

DelayQueue.take()
获取与
DelayQueue
关联的锁,这可以防止其他线程与
DelayQueue
交互。

但是,如果没有可用元素,

take()
会调用
available.await();
(请参阅DelayQueue.take()
第 242 行和 249 行的
源)。

available
DelayQueue
锁相关联(source 第 103 和 129 行):

private final transient ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();

现在,如果您查找

ReentrantLock.newCondition()

的文档

返回一个

Condition
实例以与此
Lock
实例一起使用。 当与内置监视器锁一起使用时,返回的
Condition
实例支持与
Object
监视器方法(
wait
notify
notifyAll
)相同的用法。

  • [...]
  • 当条件等待方法被调用时,锁被释放,并且在它们返回之前,锁被重新获取,并且锁持有计数恢复到调用该方法时的值。
  • [...]

这意味着在调用

available.await()
时,可重入锁被释放,这允许其他线程进入
DelayQueue
的锁定方法。

© www.soinside.com 2019 - 2024. All rights reserved.