我想知道这个实现是否正确。
场景是:
resume work
或production has stopped
信号resume work
信号也是可以接受的(所以这基本上只是为了将繁忙等待减少到最低限度,不保证100%避免它)signalEnd()
,这也应该可以防止错过
Thread.notify()
:以防 notify
ing 线程在 wait
ing 线程之前运行。代码:
package jc.lib.lang.thread;
/**
* Consumer Threads wait for signals from Producer Threads.<br>
* <ul>
* <li>Consumers wait for data ({@link #awaitProgress(long, int)}), work, wait again for more data. Can re-enlist multiple times.</li>
* <li>Producers send {@link #signalProgress()} if they made more data available.</li>
* <li>Procuders signal end ({@link #signalEnd()}) to indicate that no more data will be available.</li>
* </ul>
*
* @author jc
* @since 2023-07-20
*/
public final class JcCyclicLatch {
private final Object mSyncObject = new Object();
private volatile boolean mWorkEnded = false;
public JcCyclicLatch() {}
/**
* Consumer Threads wait for Producer Thread to signal new data.
* @return <b>true</b> if new data is present (continue working).<br> <b>false</b> if work has been completed (stop working).
* @see {@linkplain Object#wait(long, int)}
*/
public boolean awaitProgress(final long pTimeoutMs, final int pTimeoutNs) {
if (mWorkEnded) return false;
synchronized (mSyncObject) {
try {
if (mWorkEnded) return false;
mSyncObject.wait(pTimeoutMs, pTimeoutNs);
} catch (final InterruptedException e) { /* cannot happen, we ourselves control the sync object */ }
}
return !mWorkEnded;
}
/**
* @see #awaitProgress(long, int)
*/
public boolean awaitProgress(final long pTimeoutMs) {
return awaitProgress(pTimeoutMs, 0);
}
/**
* @see #awaitProgress(long, int)
*/
public boolean awaitProgress() {
return awaitProgress(0, 0);
}
/**
* Producer Thread signals availability of new data.
* @throws InterruptedException
*/
public void signalProgress() {
synchronized (mSyncObject) {
mSyncObject.notifyAll();
}
}
/**
* Break open lock so that all registered Consumers will pass. Future Consumers will also pass rigth through.
*/
public void signalEnd() {
mWorkEnded = true;
signalProgress();
}
/**
* Resets to inital status.
*/
public void reset() {
mWorkEnded = false;
}
/*
* Example method
*/
public static void main(final String[] args) {
final JcCyclicLatch l = new JcCyclicLatch();
// start consumers
for (int i = 0; i < 3; i++) {
final int index = i;
final Thread consumerThread = new Thread(() -> {
System.out.println("C " + index + " started.");
while (true) {
System.out.println("C " + index + " waiting...");
final boolean continueWorking = l.awaitProgress();
if (!continueWorking) {
System.out.println("C " + index + " is done working.");
break;
}
// ... work stuff
System.out.println("C " + index + " working.");
// ... work stuff
}
System.out.println("C " + index + " ending.");
});
consumerThread.start();
}
// this is the producer
for (int i = 0; i < 3; i++) {
System.out.println("P working...");
try {
Thread.sleep(1000); // simulates work
} catch (final InterruptedException e) { /* */ }
System.out.println("P has new data.");
l.signalProgress();
}
System.out.println("P is ending...");
l.signalEnd();
System.out.println("P finished.");
}
}
如果只涉及 1 个生产者和 1 个消费者,我会使用包含数据包的
LinkedBlockingQueue
,最后使用一个特殊的包来发出信号
production has stopped
。所以我的实际问题是,该实施是否正确和/或应该/可以在以下方面进一步改进:它是否可以防止锁定(尽可能在
obj.wait()
obj.notify()
)?
是否减少
由于这是基本场景,是否有任何默认的java类可以以同样小的代码复杂度完成此任务?
CyclicLatch
Latch
和
Lock
和
Barrier
和
Sempahore
和
Phaser
之间,我感到非常困惑。你会给它起什么名字?