我正在编写一个应用程序,其中主线程通过使用
Selector
和 SelectionKeys
处理许多连接。当我尝试将任务传递给工作线程时,我遇到了一些竞争条件问题。
我的主循环如下所示:
selector = Selector.open(); //Create selector
serverSocketChannel = ServerSocketChannel.open(); //Create socket channel, configure blocking, and bind
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //Register channel to selector
ByteBuffer buffer = ByteBuffer.allocate(8000);
while(true){
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isAcceptable()){
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
taskList.add(new ReadTask(key));
}
if(key.isWritable()){
}
iterator.remove();
}
}
这里的想法是,当客户端尝试向服务器发送数据时,它会收到具有 OP_READ 兴趣的密钥,然后使用该密钥创建一个任务,以便线程池可以处理读取,以免阻塞主线程。
问题在于,在将密钥传递给工作线程的过程中,调用此循环会继续进行,并且在调用
taskList.add(new ReadTask(key));
和最终调用 key.channel().read(buffer)
之间的整个时间,主线程仍在迭代并看到该键仍处于选中状态。在密钥的通道上调用 read 后,密钥被标记为非活动状态,并且似乎不会被选择器选择,直到来自客户端之一的另一次合法写入提示密钥再次被激活。
有没有办法让我标记该键,以免选择器在不调用 read 的情况下将其添加回所选键的列表中?我已经尝试过
selector.selectedKeys.remove(key)
,但这会产生 ConcurrentModification 异常。
您应该在 select 循环中进行读取,然后启动工作程序来处理数据并准备响应,或者从选择键的
interestOps
中删除 OP_READ,直到发送响应为止。
据我了解,没有什么可以阻止我们并行处理由于一次
SelectorKey
调用而选择的所有 selector.select()
。以下代码展示了可在同步和异步模式之间切换的处理示例:
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(host, port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
final boolean synchronousMode = false;
while (...) {
selector.select();
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
Collection<CompletableFuture<?>> futures = new ArrayList<>(selectedKeys.size());
for (SelectionKey selectionKey: selector.selectedKeys()) {
if (selectionKey.isAcceptable()) {
acceptAndRegister();
}
if (selectionKey.isReadable()) {
if (synchronousMode) {
processRequest(selectionKey); // sync processing
} else {
CompletableFuture<Void> f
= CompletableFuture.runAsync(() -> processRequest(selectionKey)); // async processing
futures.add(f);
}
}
}
if (!synchronousMode) {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(); // join all accumulated futures
}
selectedKeys.clear();
}