Java NIO 非阻塞读写操作

问题描述 投票:0回答:1

我正在为一个项目开发一个 NIO 服务器,该服务器将来自客户端的消息作为输入,其中包含读取和写入操作的运行时间。 我有一个问题,因为在第一次执行客户端时一切正常,但如果我再次运行客户端,服务器就会卡在可写部分。 你能告诉我我做错了什么吗?这些是我的文件,提前谢谢你。

MyAsyncProcessor.java

public class MyAsyncProcessor {

    enum States {
        Idle,
        Read,
        Write
    }

    ExecutorService pool;
    private Map<Integer, States> socketStates = new HashMap<>();

    public MyAsyncProcessor() {
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        pool = Executors.newFixedThreadPool(2);
        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        SelectionKey key;
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();
                while (i.hasNext()) {
                    key = i.next();
                    i.remove();
                    MyTask task = new MyTask();
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Channel hashCode: " + socketChannel.hashCode());
                        socketChannel.register(selector, SelectionKey.OP_READ + SelectionKey.OP_WRITE);
                        socketStates.put(socketChannel.hashCode(), States.Idle);
                        System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
                    }
                    if (key.isReadable()) {
                        System.out.println("Readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Idle) {
                            socketStates.put(socketChannel.hashCode(), States.Read);
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            try {
                                socketChannel.read(byteBuffer);
                                String result = new String(byteBuffer.array()).trim();
                                String[] words = result.split(" ");
                                int secondsToRead = Integer.parseInt(words[words.length - 2])*1000;
                                int secondsToWrite = Integer.parseInt(words[words.length - 1])*1000;
                                task.setTimeToRead(secondsToRead);
                                task.setTimeToWrite(secondsToWrite);
                                System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
                                Runnable h = new MyAsyncReadThread(task);
                                pool.execute(h);
                                socketChannel.register(selector, SelectionKey.OP_WRITE);
                            } catch (Exception e) {
                                System.out.println("Closing Connection Read...");
                            }
                        }
                    }
                    if (key.isWritable()) {
                        System.out.println("Writable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        States socketState = socketStates.get(socketChannel.hashCode());
                        if (socketState == States.Read) {
                            socketStates.put(socketChannel.hashCode(), States.Write);
                            System.out.println(task.getTimeToRead() + " " + task.getTimeToWrite());
                            Runnable h = new MyAsyncWriteThread(task);
                            pool.execute(h);
                        }
                        key.cancel();
                    }
                }
            }
        }
    }
}

MyClient.java

public class MyClient  {

    public static void main(String [] args) {

        Random rand = new Random();
        int secondsToRead = rand.nextInt(10);
        int secondsToWrite = secondsToRead + 1;
        String message = "Seconds for the task to be read and written: " + secondsToRead + " " + secondsToWrite;
        System.out.println(message);
        Socket socket;
        try {
            socket = new Socket("127.0.0.1", 9876);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
            printWriter.println(message);
            System.out.println("Sending message");
        } catch (IOException e) {
            System.out.println("Error in Socket");
            System.exit(-1);
        }
    }
}




java multithreading nio nonblocking
1个回答
0
投票

对不起,我不能发表评论。

你不能使用

key.cancel();
,我不知道你的业务,只有我可以建议不要那样使用地图。

JDK.NIO
很辛苦。这是你的代码(稍微改变一下),希望对你有用。

不要自己写NIO代码买。 [https://netty.io/][Netty] 不错

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author [email protected]
 * @since 2022/12/13 09:46
 */
public class MyAsyncProcessor {

    enum States {
        Idle,
        Read,
        Write
    }

    ExecutorService pool;
    private Map<Integer, States> socketStates = new HashMap<>();

    public MyAsyncProcessor() {
    }

    public static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println("execute task");
        }

        private int secondsToRead;
        private int secondsToWrite;

        public void setTimeToRead(int secondsToRead) {
            this.secondsToRead = secondsToRead;
        }

        public void setTimeToWrite(int secondsToWrite) {
            this.secondsToWrite = secondsToWrite;
        }
    }

    public static void main(String[] args) throws IOException {
        new MyAsyncProcessor().process();
    }

    public void process() throws IOException {
        pool = Executors.newFixedThreadPool(2);
        InetAddress host = InetAddress.getByName("localhost");
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(host, 9876));
        final SelectionKey register1 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        register1.attach(serverSocketChannel);
        SelectionKey key;
        while (true) {
            if (selector.select() > 0) {
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> i = selectedKeys.iterator();
                while (i.hasNext()) {
                    key = i.next();
                    i.remove();
                    MyTask task = new MyTask();
                    if (!key.isValid()) {
                        key.cancel();
                        continue;
                    }
                    if (key.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Channel hashCode: " + socketChannel.hashCode());
                        final SelectionKey register = socketChannel.register(selector, SelectionKey.OP_READ);
                        register.attach(key.attachment());
                        System.out.println("Connection accepted from: " + socketChannel.getLocalAddress());
                    }
                    if (key.isReadable()) {
                        System.out.println("Readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        socketStates.put(socketChannel.hashCode(), States.Read);


                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        try {
                            final int read = socketChannel.read(byteBuffer);
                            if (read > 0) {
                                System.out.println("receive message form client:" + new String(byteBuffer.array(), 0, read - 1));
                                task.setTimeToRead(10);
                                task.setTimeToWrite(10);
                                pool.execute(task);
                            }
                            socketChannel.register(selector, SelectionKey.OP_WRITE);
                        } catch (Exception e) {
                            socketChannel.close();
                        }
                    }
                    if (key.isValid() && key.isWritable()) {
                        System.out.println("Writable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        try {
                            socketChannel.write(ByteBuffer.wrap("hello world!".getBytes(StandardCharsets.UTF_8)));
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        } catch (IOException e) {
                            socketChannel.close();
                        }
                    }
                }
            }
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.