我正在制作一个包含两个线程的应用程序:其中一个将值写入LinkedBlockingQueue,另一个正在读取。我正在使用ScheduledExecutorService在几秒钟内运行此操作。问题是我的应用程序冻结了BlockingQueue的方法,我无法理解原因。
这是常见资源:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
这是读者
Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
作家:
Runnable writer = ()->{
res.q.add("hi");
};
完整代码:
class Res{
AtomicInteger atomicInteger = new AtomicInteger(0);
BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {
public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
Res res = new Res();
Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
Runnable reader = ()->{
try {
semaphore.acquire();
System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
Runnable writer = ()->{
res.q.add("hi");
};
Random rnd = new Random();
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(writer,time, TimeUnit.SECONDS);
}
for (int i = 0; i < 20; i++) {
int time = rnd.nextInt(5)+ 2;
executorService.schedule(reader,time, TimeUnit.SECONDS);
}
executorService.shutdown();
}
它应该打印二十行“ hi [number]”,但冻结在某行上。例如,我当前的打印:
hi 1
hi 2
hi 3
hi 4
hi 5
我发现如果我增加线程数newScheduledThreadPool(20)
,它将开始工作,但是如何用两个线程来完成呢?谢谢!
尽管同时发生的事情很明显,但是遵循您的代码有点困难。由于Executors.newScheduledThreadPool(2);
,您一次最多可以运行两个线程。这两个线程都是reader
线程。
因此Thread-1
进入try
块并通过semaphore.acquire();
获得了信号灯许可,但是队列为空-因此,它在res.q.take()
上被阻塞。下一个线程-Thread-2
也是读取器线程,但是它无法获取permit
,因为它已被Thread-1
占用,并在semaphore.acquire();
上被阻塞。由于您没有足够的空间容纳其他线程(您的池被阻止使用这两个线程),因此没有任何编写器会将某些内容放入您的队列中并因此取消阻止Thread-1
(以便res.q.take()
可以工作)。
添加更多工作线程只会延迟问题-您可能最终会回到以前的位置。