创建事件调度线程安全信号灯

问题描述 投票:2回答:1

我一直在尝试制作一个二进制信号量,该信号量将能够安全地阻止在事件分发线程(EDT)上运行的方法的执行,而不会实际阻止该线程处理更多事件。最初看来这似乎是不可能的,但是Java具有与此相关的一些内置功能,但我无法完全使其正常工作。

用例

[当前,如果从EDT中显示一个模态摆动对话框,它将似乎阻止了EDT(因为显示模态对话框的方法直到对话框关闭后才会继续到下一行),但实际上-使EDT进入新的事件循环的引擎盖魔术,该循环将继续调度事件,直到关闭模式对话框。

我的团队目前拥有从Swing缓慢迁移到JavaFX的应用程序(有些棘手的过渡),我希望能够以与显示Swing模态对话框相同的方式显示AWT事件分配线程中的模态JavaFX对话框。 。似乎拥有某种EDT安全信号灯将满足此用例,并可能在以后的其他用途中派上用场。

方法

java.awt.EventQueue.createSecondaryLoop()是创建SecondaryLoop对象的方法,然后您可以使用该方法启动新的事件处理循环。当您调用SecondaryLoop.enter()时,该调用将在处理新的事件循环时被阻止(请注意call会被阻止,但thread不会被阻止,因为它正在事件处理循环中继续)。新的事件循环将一直持续到您调用SecondaryLoop.exit()为止(这并不完全正确,请参见我的related SO question)。

因此,我创建了一个信号量,在该信号量中,要获取阻塞调用将导致等待正常线程的闩锁,或进入EDT的辅助循环。每次获取信号的阻塞调用还添加了一个释放信号时释放的阻塞操作(对于普通线程,它只是减小锁存器,对于EDT,它退出辅助循环)。

这是我的代码:


import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore{

    /** Operations used to unblock threads when a semaphore is released.
     * Must be a stack because secondary loops have to be exited in the
     * reverse of the order in which they were entered in order to unblock
     * the execution of the method that entered the loop.
     */
    private Stack<Runnable> releaseOperations = new Stack<>();

    private boolean semaphoreAlreadyAcquired = false;


    public EventDispatchThreadSafeBinarySemaphore() {
        super(0);
    }

    @Override
    public boolean isFair() {
        return false;
    }

    @Override
    public void acquire() throws InterruptedException {

        Runnable blockingOperation = () -> {};

        synchronized(this) {
            if(semaphoreAlreadyAcquired) {

                //We didn't acquire the semaphore, need to set up an operation to execute
                //while we're waiting on the semaphore and an operation for another thread
                //to execute in order to unblock us when the semaphore becomes available

                if(EventQueue.isDispatchThread()) {

                    //For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
                    //processing AWT events.
                    SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();

                    releaseOperations.add(() -> temporaryAwtLoop.exit());

                    blockingOperation = () -> {

                        if(!temporaryAwtLoop.enter()) {
                            //I don't think we'll run into this, but I'm leaving this here for now for debug purposes
                            System.err.println("Failed to enter event loop");
                        }
                    };
                }
                else {

                    //Non-dispatch thread is a little simpler, we'll just wait on a latch
                    CountDownLatch blockedLatch = new CountDownLatch(1);
                    releaseOperations.add(() -> blockedLatch.countDown());
                    blockingOperation = () -> {
                        try {
                            blockedLatch.await();
                        } catch (InterruptedException e) {
                            //I'll worry about handling this better once I have the basics figured out
                            e.printStackTrace();
                        }
                    };
                }
            }
            else {
                semaphoreAlreadyAcquired = true;
            }
        }

        //This part must be executed outside of the synchronized block so that we don't block
        //the EDT if it tries to acquire the semaphore while this statement is blocked
        blockingOperation.run();

    }

    @Override
    public void release() {
        synchronized(this) {
            if(releaseOperations.size() > 0) {
                //Release the last blocked thread
                releaseOperations.pop().run();
            }
            else {
                semaphoreAlreadyAcquired = false;
            }
        }
    }

}

这是我相关的JUnit测试代码(我为大尺寸表示歉意,这是到目前为止我能提出的最小的最小可验证示例:]

public class TestEventDispatchThreadSafeBinarySemaphore {

    private static EventDispatchThreadSafeBinarySemaphore semaphore;
        //See https://stackoverflow.com/questions/58192008/secondaryloop-enter-not-blocking-until-exit-is-called-on-the-edt
        //for why we need this timer
        private static Timer timer = new Timer(500, null);
        @BeforeClass
    public static void setupClass() {
        timer.start();
    }

    @Before
    public void setup() {
        semaphore = new EventDispatchThreadSafeBinarySemaphore();
    }
        @AfterClass
    public static void cleanupClass() {
        timer.stop();
    }

        //This test passes just fine
        @Test(timeout = 1000)
    public void testBlockingAcquireReleaseOnEDT() throws InterruptedException {

        semaphore.acquire();

        CountDownLatch edtCodeStarted = new CountDownLatch(1);
        CountDownLatch edtCodeFinished = new CountDownLatch(1);

        SwingUtilities.invokeLater(() -> {
            //One countdown to indicate that this has begun running
            edtCodeStarted.countDown();
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            //This countdown indicates that it has finished running
            edtCodeFinished.countDown();

        });

        //Ensure that the code on the EDT has started
        edtCodeStarted.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        //Ensure that things can still run on the EDT
        CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
        SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());

        //If we get past this line, then we know that the EDT is live even though the 
        //code in the invokeLater call is blocked
        edtActiveCheckingLatch.await();

        assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());

        semaphore.release();

        //If we get past this line, then the code on the EDT got past the semaphore
        edtCodeFinished.await();
    }

        //This test fails intermittently, but so far only after the previous test was run first
    @Test(timeout = 10000)
    public void testConcurrentAcquiresOnEDT() throws InterruptedException {

        int numThreads =100;

        CountDownLatch doneLatch = new CountDownLatch(numThreads);

        try {
            semaphore.acquire();

            //Queue up a bunch of threads to acquire and release the semaphore
            //as soon as it becomes available
            IntStream.range(0, numThreads)
                    .parallel()
                    .forEach((threadNumber) -> 
                        SwingUtilities.invokeLater(() -> {
                            try {
                                semaphore.acquire();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            finally {
                                semaphore.release();
                                //Count down the latch to indicate that the thread terminated
                                doneLatch.countDown();
                            }
                        })
                    );

            semaphore.release();

            doneLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

问题

testConcurrentAcquiresOnEDT有时会通过,有时会失败。我相信我知道为什么。我研究了Java源代码,并在WaitDispatchSupportSecondaryLoop的具体实现)中,循环基本上继续调度事件,直到清除了名为keepBlockingEDT的标志为止。它将在事件之间进行检查。当我调用exit时,它将清除该标志并发送事件以唤醒事件队列,以防它正在等待更多事件。但是,它不会导致enter()方法立即退出(并且我认为无论如何都不可能)。

所以这是死锁的结果:

  • 主线程获取信号量
  • EDT线程尝试获取信号量,但是已经被获取,因此:
    • 创建一个新的辅助循环
    • 创建一个Runnable,它将退出新的辅助循环并将其推入releaseOperations堆栈
    • 进入辅助循环,导致执行阻塞(请注意,此最后一步必然在synchronized块之外
  • 主线程释放信号量,这导致发生以下情况:
    • 弹出releaseOperations堆栈,并在辅助循环上调用exit
    • exit调用,将该次级循环的keepBlockingEDT标志设置为false
  • 在EDT中,刚刚检查完keepBlockingEDT标志(就在将其设置为false之前,并且正在获取下一个事件。)>
  • 事实证明,下一个事件是另一个在信号量上阻塞的可运行对象,因此它试图获取该信号
  • 这将在原始SecondaryLoop的顶部创建另一个SecondaryLoop并输入它
  • 至此,原始SecondaryLoop已经清除了它的keepBlockingEDT标志,并且可以停止阻塞,除了当前正在阻塞运行第二个SecondaryLoop之外。第二个SecondaryLoop永远不会调用它,因为现在还没有人真正获得该信号灯,因此我们将永远阻塞。
  • 我已经为此工作了几天,我想到的每个想法都是死胡同。

我相信我有一个可能的部分解决方案,那就是根本不允许一次在信号量上阻塞一个以上的线程(如果另一个线程试图获取它,我将抛出IllegalStateException)。如果它们各自使用自己的信号量,我仍然可以进行多个次要循环,但是每个信号量最多只能创建1个次要循环。我认为这可行,并且可以很好地满足我最可能的用例(因为大多数情况下,我只想显示事件线程中的单个JavaFX模态对话框)。我只是想知道是否还有其他想法,因为我觉得我已经接近做出一些很酷的东西了,但是那根本行不通。

让我知道您是否有任何想法。而且“我很确定这是不可能的,这就是原因...”也是可以接受的答案。

我一直在尝试制作一个二进制信号量,该信号量将能够安全地阻止在事件分发线程(EDT)上运行的方法的执行,而不会实际阻止该线程处理更多......>

java multithreading event-dispatch-thread
1个回答
0
投票
阅读API后,您似乎变得过于复杂。使用Semaphore本质上是阻塞的,但是您要做的就是阻止当前代码继续运行,直到JavaFX对话框关闭为止-至少,这就是我理解您的问题的方式。考虑到这一点,我相信以下将满足您的要求:

import java.awt.EventQueue; import java.awt.SecondaryLoop; import java.awt.Toolkit; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import javafx.application.Platform; import javafx.scene.control.Dialog; public class Foo { public static <R> Optional<R> showFxDialogAndWaitOnEDT(Dialog<R> dialog) { if (!EventQueue.isDispatchThread()) { throw new IllegalStateException("current thread is not the EDT"); } AtomicReference<Optional<R>> valueRef = new AtomicReference<>(); SecondaryLoop loop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop(); Platform.runLater(() -> { valueRef.set(dialog.showAndWait()); loop.exit(); }); loop.enter(); // check return value? return valueRef.get(); } }

© www.soinside.com 2019 - 2024. All rights reserved.