我一直在尝试制作一个二进制信号量,该信号量将能够安全地阻止在事件分发线程(EDT)上运行的方法的执行,而不会实际阻止该线程处理更多事件。最初看来这似乎是不可能的,但是Java具有与此相关的一些内置功能,但我无法完全使其正常工作。
[当前,如果从EDT中显示一个模态摆动对话框,它将似乎阻止了EDT(因为显示模态对话框的方法直到对话框关闭后才会继续到下一行),但实际上-使EDT进入新的事件循环的引擎盖魔术,该循环将继续调度事件,直到关闭模式对话框。
我的团队目前拥有从Swing缓慢迁移到JavaFX的应用程序(有些棘手的过渡),我希望能够以与显示Swing模态对话框相同的方式显示AWT事件分配线程中的模态JavaFX对话框。 。似乎拥有某种EDT安全信号灯将满足此用例,并可能在以后的其他用途中派上用场。
java.awt.EventQueue.createSecondaryLoop()
是创建SecondaryLoop
对象的方法,然后您可以使用该方法启动新的事件处理循环。当您调用SecondaryLoop.enter()
时,该调用将在处理新的事件循环时被阻止(请注意call会被阻止,但thread不会被阻止,因为它正在事件处理循环中继续)。新的事件循环将一直持续到您调用SecondaryLoop.exit()
为止(这并不完全正确,请参见我的related SO question)。
因此,我创建了一个信号量,在该信号量中,要获取阻塞调用将导致等待正常线程的闩锁,或进入EDT的辅助循环。每次获取信号的阻塞调用还添加了一个释放信号时释放的阻塞操作(对于普通线程,它只是减小锁存器,对于EDT,它退出辅助循环)。
这是我的代码:
import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Stack;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
@SuppressWarnings("serial")
public class EventDispatchThreadSafeBinarySemaphore extends Semaphore{
/** Operations used to unblock threads when a semaphore is released.
* Must be a stack because secondary loops have to be exited in the
* reverse of the order in which they were entered in order to unblock
* the execution of the method that entered the loop.
*/
private Stack<Runnable> releaseOperations = new Stack<>();
private boolean semaphoreAlreadyAcquired = false;
public EventDispatchThreadSafeBinarySemaphore() {
super(0);
}
@Override
public boolean isFair() {
return false;
}
@Override
public void acquire() throws InterruptedException {
Runnable blockingOperation = () -> {};
synchronized(this) {
if(semaphoreAlreadyAcquired) {
//We didn't acquire the semaphore, need to set up an operation to execute
//while we're waiting on the semaphore and an operation for another thread
//to execute in order to unblock us when the semaphore becomes available
if(EventQueue.isDispatchThread()) {
//For the EDT, we don't want to actually block, rather we'll enter a new loop that will continue
//processing AWT events.
SecondaryLoop temporaryAwtLoop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();
releaseOperations.add(() -> temporaryAwtLoop.exit());
blockingOperation = () -> {
if(!temporaryAwtLoop.enter()) {
//I don't think we'll run into this, but I'm leaving this here for now for debug purposes
System.err.println("Failed to enter event loop");
}
};
}
else {
//Non-dispatch thread is a little simpler, we'll just wait on a latch
CountDownLatch blockedLatch = new CountDownLatch(1);
releaseOperations.add(() -> blockedLatch.countDown());
blockingOperation = () -> {
try {
blockedLatch.await();
} catch (InterruptedException e) {
//I'll worry about handling this better once I have the basics figured out
e.printStackTrace();
}
};
}
}
else {
semaphoreAlreadyAcquired = true;
}
}
//This part must be executed outside of the synchronized block so that we don't block
//the EDT if it tries to acquire the semaphore while this statement is blocked
blockingOperation.run();
}
@Override
public void release() {
synchronized(this) {
if(releaseOperations.size() > 0) {
//Release the last blocked thread
releaseOperations.pop().run();
}
else {
semaphoreAlreadyAcquired = false;
}
}
}
}
这是我相关的JUnit测试代码(我为大尺寸表示歉意,这是到目前为止我能提出的最小的最小可验证示例:]
public class TestEventDispatchThreadSafeBinarySemaphore {
private static EventDispatchThreadSafeBinarySemaphore semaphore;
//See https://stackoverflow.com/questions/58192008/secondaryloop-enter-not-blocking-until-exit-is-called-on-the-edt
//for why we need this timer
private static Timer timer = new Timer(500, null);
@BeforeClass
public static void setupClass() {
timer.start();
}
@Before
public void setup() {
semaphore = new EventDispatchThreadSafeBinarySemaphore();
}
@AfterClass
public static void cleanupClass() {
timer.stop();
}
//This test passes just fine
@Test(timeout = 1000)
public void testBlockingAcquireReleaseOnEDT() throws InterruptedException {
semaphore.acquire();
CountDownLatch edtCodeStarted = new CountDownLatch(1);
CountDownLatch edtCodeFinished = new CountDownLatch(1);
SwingUtilities.invokeLater(() -> {
//One countdown to indicate that this has begun running
edtCodeStarted.countDown();
try {
semaphore.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//This countdown indicates that it has finished running
edtCodeFinished.countDown();
});
//Ensure that the code on the EDT has started
edtCodeStarted.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
//Ensure that things can still run on the EDT
CountDownLatch edtActiveCheckingLatch = new CountDownLatch(1);
SwingUtilities.invokeLater(() -> edtActiveCheckingLatch.countDown());
//If we get past this line, then we know that the EDT is live even though the
//code in the invokeLater call is blocked
edtActiveCheckingLatch.await();
assertEquals("Code on original AWT event thread should still be blocked", 1, edtCodeFinished.getCount());
semaphore.release();
//If we get past this line, then the code on the EDT got past the semaphore
edtCodeFinished.await();
}
//This test fails intermittently, but so far only after the previous test was run first
@Test(timeout = 10000)
public void testConcurrentAcquiresOnEDT() throws InterruptedException {
int numThreads =100;
CountDownLatch doneLatch = new CountDownLatch(numThreads);
try {
semaphore.acquire();
//Queue up a bunch of threads to acquire and release the semaphore
//as soon as it becomes available
IntStream.range(0, numThreads)
.parallel()
.forEach((threadNumber) ->
SwingUtilities.invokeLater(() -> {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
semaphore.release();
//Count down the latch to indicate that the thread terminated
doneLatch.countDown();
}
})
);
semaphore.release();
doneLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
testConcurrentAcquiresOnEDT
有时会通过,有时会失败。我相信我知道为什么。我研究了Java源代码,并在WaitDispatchSupport
(SecondaryLoop
的具体实现)中,循环基本上继续调度事件,直到清除了名为keepBlockingEDT
的标志为止。它将在事件之间进行检查。当我调用exit
时,它将清除该标志并发送事件以唤醒事件队列,以防它正在等待更多事件。但是,它不会导致enter()
方法立即退出(并且我认为无论如何都不可能)。
所以这是死锁的结果:
releaseOperations
堆栈synchronized
块之外releaseOperations
堆栈,并在辅助循环上调用exit
exit
调用,将该次级循环的keepBlockingEDT
标志设置为falsekeepBlockingEDT
标志(就在将其设置为false之前,并且正在获取下一个事件。)>SecondaryLoop
的顶部创建另一个SecondaryLoop
并输入它SecondaryLoop
已经清除了它的keepBlockingEDT
标志,并且可以停止阻塞,除了当前正在阻塞运行第二个SecondaryLoop
之外。第二个SecondaryLoop
永远不会调用它,因为现在还没有人真正获得该信号灯,因此我们将永远阻塞。我已经为此工作了几天,我想到的每个想法都是死胡同。
我相信我有一个可能的部分解决方案,那就是根本不允许一次在信号量上阻塞一个以上的线程(如果另一个线程试图获取它,我将抛出IllegalStateException)。如果它们各自使用自己的信号量,我仍然可以进行多个次要循环,但是每个信号量最多只能创建1个次要循环。我认为这可行,并且可以很好地满足我最可能的用例(因为大多数情况下,我只想显示事件线程中的单个JavaFX模态对话框)。我只是想知道是否还有其他想法,因为我觉得我已经接近做出一些很酷的东西了,但是那根本行不通。
让我知道您是否有任何想法。而且“我很确定这是不可能的,这就是原因...”也是可以接受的答案。
我一直在尝试制作一个二进制信号量,该信号量将能够安全地阻止在事件分发线程(EDT)上运行的方法的执行,而不会实际阻止该线程处理更多......>
Semaphore
本质上是阻塞的,但是您要做的就是阻止当前代码继续运行,直到JavaFX对话框关闭为止-至少,这就是我理解您的问题的方式。考虑到这一点,我相信以下将满足您的要求:import java.awt.EventQueue;
import java.awt.SecondaryLoop;
import java.awt.Toolkit;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import javafx.application.Platform;
import javafx.scene.control.Dialog;
public class Foo {
public static <R> Optional<R> showFxDialogAndWaitOnEDT(Dialog<R> dialog) {
if (!EventQueue.isDispatchThread()) {
throw new IllegalStateException("current thread is not the EDT");
}
AtomicReference<Optional<R>> valueRef = new AtomicReference<>();
SecondaryLoop loop = Toolkit.getDefaultToolkit().getSystemEventQueue().createSecondaryLoop();
Platform.runLater(() -> {
valueRef.set(dialog.showAndWait());
loop.exit();
});
loop.enter(); // check return value?
return valueRef.get();
}
}