这个实现正确吗?

问题描述 投票:0回答:0

我想知道这个实现是否正确。

场景是:

  • 一个制作人
  • 未知数量的消费者(0-n)
  • 生产者只需向消费者传达
    resume work
    production has stopped
    信号
  • 消费者自己处理数据检查,因此即使没有任何新数据存在的一些冗余
    resume work
    信号也是可以接受的(所以这基本上只是为了将繁忙等待减少到最低限度,不保证100%避免它)
  • 消费者也不要在数据上竞争。数据不会被消耗,只是存在更多可以处理的数据。 (如果有消耗,我会使用信号量或BlockingQueues)
  • 使用
signalEnd()

,这也应该可以防止错过

Thread.notify()
:以防
notify
ing 线程在
wait
ing 线程之前运行。
代码:

package jc.lib.lang.thread; /** * Consumer Threads wait for signals from Producer Threads.<br> * <ul> * <li>Consumers wait for data ({@link #awaitProgress(long, int)}), work, wait again for more data. Can re-enlist multiple times.</li> * <li>Producers send {@link #signalProgress()} if they made more data available.</li> * <li>Procuders signal end ({@link #signalEnd()}) to indicate that no more data will be available.</li> * </ul> * * @author jc * @since 2023-07-20 */ public final class JcCyclicLatch { private final Object mSyncObject = new Object(); private volatile boolean mWorkEnded = false; public JcCyclicLatch() {} /** * Consumer Threads wait for Producer Thread to signal new data. * @return <b>true</b> if new data is present (continue working).<br> <b>false</b> if work has been completed (stop working). * @see {@linkplain Object#wait(long, int)} */ public boolean awaitProgress(final long pTimeoutMs, final int pTimeoutNs) { if (mWorkEnded) return false; synchronized (mSyncObject) { try { if (mWorkEnded) return false; mSyncObject.wait(pTimeoutMs, pTimeoutNs); } catch (final InterruptedException e) { /* cannot happen, we ourselves control the sync object */ } } return !mWorkEnded; } /** * @see #awaitProgress(long, int) */ public boolean awaitProgress(final long pTimeoutMs) { return awaitProgress(pTimeoutMs, 0); } /** * @see #awaitProgress(long, int) */ public boolean awaitProgress() { return awaitProgress(0, 0); } /** * Producer Thread signals availability of new data. * @throws InterruptedException */ public void signalProgress() { synchronized (mSyncObject) { mSyncObject.notifyAll(); } } /** * Break open lock so that all registered Consumers will pass. Future Consumers will also pass rigth through. */ public void signalEnd() { mWorkEnded = true; signalProgress(); } /** * Resets to inital status. */ public void reset() { mWorkEnded = false; } /* * Example method */ public static void main(final String[] args) { final JcCyclicLatch l = new JcCyclicLatch(); // start consumers for (int i = 0; i < 3; i++) { final int index = i; final Thread consumerThread = new Thread(() -> { System.out.println("C " + index + " started."); while (true) { System.out.println("C " + index + " waiting..."); final boolean continueWorking = l.awaitProgress(); if (!continueWorking) { System.out.println("C " + index + " is done working."); break; } // ... work stuff System.out.println("C " + index + " working."); // ... work stuff } System.out.println("C " + index + " ending."); }); consumerThread.start(); } // this is the producer for (int i = 0; i < 3; i++) { System.out.println("P working..."); try { Thread.sleep(1000); // simulates work } catch (final InterruptedException e) { /* */ } System.out.println("P has new data."); l.signalProgress(); } System.out.println("P is ending..."); l.signalEnd(); System.out.println("P finished."); } }

如果只涉及 1 个生产者和 1 个消费者,我会使用包含数据包的 
LinkedBlockingQueue

,最后使用一个特殊的包来发出信号

production has stopped
所以我的实际问题是,

该实施是否正确和/或应该/可以在以下方面进一步改进:

它是否可以防止锁定(尽可能在

obj.wait()
    之后使用
  • obj.notify()
    )?
    是否
    减少
  • 忙碌的等待?
  • 其他问题,在这里收集意见,在答案中考虑或评论:

由于这是基本场景,是否有任何默认的java类可以以同样小的代码复杂度完成此任务?

    我不确定这个名字
  1. CyclicLatch
  2. 是否合适。在所有
  3. Latch
    Lock
    Barrier
    Sempahore
    Phaser
     之间,我感到非常困惑。你会给它起什么名字?
        
java multithreading concurrency deadlock synchronized
© www.soinside.com 2019 - 2024. All rights reserved.