创建用于命令处理的套接字通信的线程

问题描述 投票:0回答:1

我正在开发一个涉及套接字通信的Java应用程序,我正在尝试创建一个Thread子类,将其称为CommandThread,它从套接字读取命令。有效命令包括“DELAY n”和“QUIT”。

我有一些问题和疑虑:

如何改进 processCommand 方法中的错误处理? 有没有更好的方法来构建从套接字读取和处理命令的代码? 关于在多线程环境中处理套接字的最佳实践有什么一般建议吗? 我感谢您提供的任何见解、建议或更正!谢谢你。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class CommandThread extends Thread {
    private final Socket clientSocket;
    private boolean isRunning;

    public CommandThread(Socket clientSocket) {
        this.clientSocket = clientSocket;
        this.isRunning = true;
    }

    @Override
    public void run() {
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);

            while (isRunning) {
                String command = reader.readLine();
                if (command != null) {
                    processCommand(command, writer);
                }
            }

            // Clean up resources
            reader.close();
            writer.close();
            clientSocket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void processCommand(String command, PrintWriter writer) {
        if (command.startsWith("DELAY ")) {
            try {
                int delayTime = Integer.parseInt(command.substring(6));
                Thread.sleep(delayTime);
                writer.println("OK");
            } catch (InterruptedException | NumberFormatException e) {
                writer.println("ERROR");
            }
        } else if (command.equals("QUIT")) {
            writer.println("BYE");
            isRunning = false;
        } else {
            writer.println("INVALID COMMAND");
        }
    }

    public static void main(String[] args) {
        // Usage example:
        // In your server code, when a new client connects, create a CommandThread for that client.
        // CommandThread commandThread = new CommandThread(clientSocket);
        // commandThread.start();
    }
}

java sockets java-threads
1个回答
0
投票
  • 如何改进 processCommand 方法中的错误处理?

    使用

    try-with-resources

  • 是否有更好的方法来构造从套接字读取和处理命令的代码?关于在多线程环境中处理套接字的最佳实践有什么一般建议吗?

    使用线程池而不是新建线程,这样可以更好地重用服务器资源

public class CommandHandler implements Runnable {
    private final Socket clientSocket;
    private boolean isRunning;

    public CommandHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
        this.isRunning = true;
    }

    @Override
    public void run() {
        try (Socket socket = clientSocket;
             BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter writer = new PrintWriter(socket.getOutputStream(), true)) {

            while (isRunning) {
                String command = reader.readLine();
                if (command != null) {
                    String result = processCommand(command);
                    writer.println(result);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private String processCommand(String command) {
        if (command.startsWith("DELAY ")) {
            try {
                int delayTime = Integer.parseInt(command.substring(6));
                Thread.sleep(delayTime);
                return "OK";
            } catch (InterruptedException | NumberFormatException e) {
                return "ERROR";
            }
        } else if (command.equals("QUIT")) {
            isRunning = false;
            return "BYE";
        } else {
            return "INVALID COMMAND";
        }
    }

    public static void main(String[] args) {
        // Usage example:
        /*
        // global thread pool
        ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 40,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(500));

        ServerSocket serverSocket = new ServerSocket(8080);
        while (true) {
            Socket clientSocket = serverSocket.accept();
            CommandHandler commandHandler = new CommandHandler(clientSocket);
            executor.execute(commandHandler);
        }
        */
    }
}

您的代码是用BIO模式编写的,使用NIO或AIO可以获得更好的性能。

使用Reactor模式来编写你的代码,你可以从Netty学习更多内容[https://netty.io/index.html]。

这是一个用 Java nio 编写的简单示例,如果使用 Netty 可以做得更好。

public class ServerSocketTest {

    /**
     * server ip
     */
    private final String HOST = "127.0.0.1";

    /**
     * server port
     */
    private final int PORT = 8080;

    @Test
    public void testServer() throws IOException {

        // create Selector
        Selector selector = Selector.open();

        // create ServerSocketChannel and listen port
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
        serverSocketChannel.configureBlocking(false);

        // register Channel
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        // create buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        Charset charset = StandardCharsets.UTF_8;
        CharsetDecoder charsetDecoder = charset.newDecoder();
        CharBuffer charBuffer = CharBuffer.allocate(1024);

        while (true) {
            // block and wait
            if (selector.select() == 0) {
                continue;
            }

            // get ready Channel set
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                // remove current key from set
                iterator.remove();


                // if read event
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    // accept connection from client
                    SocketChannel clientChannel = server.accept();
                    clientChannel.configureBlocking(false);
                    // register with Selector,wait for connections
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    System.out.println("Accepted connection from " + clientChannel);
                }

                // if read event
                if (key.isReadable()) {
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    buffer.clear();
                    // read data
                    StringBuilder stringBuilder = new StringBuilder();
                    while (clientChannel.read(buffer) != -1 || buffer.position() != 0) {
                        buffer.flip();
                        charsetDecoder.decode(buffer, charBuffer, true);
                        charBuffer.flip();
                        stringBuilder.append(charBuffer);
                        charBuffer.clear();
                        buffer.compact();
                    }

                    String message = stringBuilder.toString();
                    System.out.println("Received message from " + clientChannel + ": " + message);
                    // write data back
                    clientChannel.write(charset.encode("Echo: " + message));

                    // client close connection
                    clientChannel.close();
                }
            }
        }
    }


    @Test
    public void testClient() {
        ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
        Charset charset = StandardCharsets.UTF_8;
        CharsetDecoder charsetDecoder = charset.newDecoder();

        // connect server
        try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT))) {
            // send message to server
            byteBuffer.put(charset.encode("hello server"));
            byteBuffer.flip();
            socketChannel.write(byteBuffer);
            socketChannel.shutdownOutput();
            byteBuffer.clear();

            // receive response from server
            StringBuilder stringBuilder = new StringBuilder();
            while (socketChannel.read(byteBuffer) != -1 || byteBuffer.position() != 0) {
                byteBuffer.flip();
                // decode and store
                CharBuffer decode = charsetDecoder.decode(byteBuffer);
                stringBuilder.append(decode);
                byteBuffer.compact();
            }
            System.out.println("Received message from server " + socketChannel + ": " + stringBuilder);
        } catch (IOException e) {
            Assert.fail();
        }
    }

}

© www.soinside.com 2019 - 2024. All rights reserved.