使用 Selector 和 SelectionKeys 委托给线程池

问题描述 投票:0回答:2

我正在编写一个应用程序,其中主线程通过使用

Selector
SelectionKeys
处理许多连接。当我尝试将任务传递给工作线程时,我遇到了一些竞争条件问题。

我的主循环如下所示:

    selector = Selector.open(); //Create selector

    serverSocketChannel = ServerSocketChannel.open(); //Create socket channel, configure blocking, and bind
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(PORT));

    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //Register channel to selector

    ByteBuffer buffer = ByteBuffer.allocate(8000);

    while(true){
        selector.select();

        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();

        while(iterator.hasNext()){
            SelectionKey key = iterator.next();

            if(key.isAcceptable()){
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
            if(key.isReadable()){
                taskList.add(new ReadTask(key));
            }
            if(key.isWritable()){

            }

            iterator.remove();
        }
    }

这里的想法是,当客户端尝试向服务器发送数据时,它会收到具有 OP_READ 兴趣的密钥,然后使用该密钥创建一个任务,以便线程池可以处理读取,以免阻塞主线程。

问题在于,在将密钥传递给工作线程的过程中,调用此循环会继续进行,并且在调用

taskList.add(new ReadTask(key));
和最终调用
key.channel().read(buffer)
之间的整个时间,主线程仍在迭代并看到该键仍处于选中状态。在密钥的通道上调用 read 后,密钥被标记为非活动状态,并且似乎不会被选择器选择,直到来自客户端之一的另一次合法写入提示密钥再次被激活。

有没有办法让我标记该键,以免选择器在不调用 read 的情况下将其添加回所选键的列表中?我已经尝试过

selector.selectedKeys.remove(key)
,但这会产生 ConcurrentModification 异常。

java multithreading
2个回答
0
投票

您应该在 select 循环中进行读取,然后启动工作程序来处理数据并准备响应,或者从选择键的

interestOps
中删除 OP_READ,直到发送响应为止。


0
投票

据我了解,没有什么可以阻止我们并行处理由于一次

SelectorKey
调用而选择的所有
selector.select()
。以下代码展示了可在同步和异步模式之间切换的处理示例:

        selector = Selector.open();

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(new InetSocketAddress(host, port));
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        final boolean synchronousMode = false;
        while (...) {
            selector.select();
            final Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Collection<CompletableFuture<?>> futures = new ArrayList<>(selectedKeys.size());
            for (SelectionKey selectionKey: selector.selectedKeys()) {
                if (selectionKey.isAcceptable()) {
                    acceptAndRegister();
                }
                if (selectionKey.isReadable()) {
                    if (synchronousMode) {
                        processRequest(selectionKey); // sync processing
                    } else {
                        CompletableFuture<Void> f 
                           = CompletableFuture.runAsync(() -> processRequest(selectionKey)); // async processing
                        futures.add(f);
                    }
                }
            }
            if (!synchronousMode) {
                CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(); // join all accumulated futures
            }
            selectedKeys.clear();
        }
© www.soinside.com 2019 - 2024. All rights reserved.