服务器-客户端程序中的多线程问题(Java)

问题描述 投票:0回答:1

我一直在尝试用Java实现一个简单的多线程服务器-客户端程序。这是 Hyperskill 中的一个项目阶段(在此处找到它)。

阶段目标如下:

服务器应将数据库保存在硬盘上的 db.json 文件中,该文件应作为 JSON 文件存储在 /server/data 文件夹中。 在服务器上使用执行器以同时处理多个请求。对数据库文件的写入应受到描述中所述的锁的保护。 实现从文件读取请求的能力。如果 -in 参数后跟所提供的文件名,则从该文件读取请求。该文件将存储在/client/data中。

我有 2 个包,分别是客户端和服务器,每个包中有 4 个不同的类。请在下面找到我的完整源代码。您还可以在源代码后找到自动化测试结果。

// client.Main

package client;

public class Main {

    public static void main(String[] args) {
        ClientInitializer clientInitializer = new ClientInitializer(args);
        clientInitializer.start();
    }
}

// client.clientInitializer
package client;

import com.google.gson.Gson;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Scanner;

public class ClientInitializer {

    private final List<String> args;

    public ClientInitializer(String[] args) {
        this.args = Arrays.asList(args);
    }

    @SuppressWarnings("CallToPrintStackTrace")
    public void start() {
        String input = ".\\client\\data\\" + getInput();
        if (!input.equals(".\\client\\data\\")) {
            try (Scanner fileScanner = new Scanner(Paths.get(input))) {
                StringBuilder jsonBuilder = new StringBuilder();
                while (fileScanner.hasNextLine()) {
                    jsonBuilder.append(fileScanner.nextLine());
                }
                String readRequest = jsonBuilder.toString();
                Request request = new Request(new Gson().fromJson(readRequest, Request.class));
                Client client = new Client(request);
                client.run();
                return;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        String type = getType();
        String key = getKey();
        String value = getValue();
        Request request = new Request(type, key, value);
        Client client = new Client(request);
        client.run();
    }

    private String getInput() {
        try {
            int indexOfInput = args.indexOf("-in");
            return indexOfInput == -1 ? "" : args.get(indexOfInput + 1);
        } catch (NullPointerException | NoSuchElementException e) {
            return "";
        }
    }

    private String getType() {
        int indexOfType = args.indexOf("-t") + 1;
        return args.get(indexOfType);
    }

    private String getKey() {
        try {
            int indexOfKey = args.indexOf("-k");
            return indexOfKey == -1 ? null : args.get(indexOfKey + 1);
        } catch (NullPointerException | NoSuchElementException e) {
            return null;
        }
    }

    private String getValue() {
        try {
            int indexOfCellIndex = args.indexOf("-v");
            return indexOfCellIndex == -1 ? null : args.get(indexOfCellIndex + 1);
        } catch (NullPointerException | NoSuchElementException e) {
            return null;
        }
    }
}
// client.Request

package client;

public class Request {

    private final String type;
    private final String key;
    private final String value;

    public Request(String type, String key, String value) {
        this.type = type;
        this.key = key;
        this.value = value;
    }

    public Request(Request request) {
        this.type = request.getType();
        this.key = request.getKey();
        this.value = request.getValue();
    }

    public String getType() {
        return type;
    }

    public String getKey() {
        return key;
    }

    public String getValue() {
        return value;
    }
}
// client.Client

package client;

import com.google.gson.Gson;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.Socket;

public class Client {

    private static final String ADDRESS = "127.0.0.1";
    private static final int PORT = 23456;
    private final Request request;

    public Client(Request request) {
        this.request = request;
    }

    @SuppressWarnings("CallToPrintStackTrace")
    public void run() {
        System.out.println("Client started!");
        try (Socket socket = new Socket(ADDRESS, PORT);
             DataInputStream input = new DataInputStream(socket.getInputStream());
             DataOutputStream output = new DataOutputStream(socket.getOutputStream())
        ) {
            String sentJson = new Gson().toJson(request);
            output.writeUTF(sentJson);
            System.out.println("Sent: " + sentJson);
            if (socket.isOutputShutdown()) {
                System.out.println("Server closed the connection.");
            } else {
                try {
                    String receivedJson = input.readUTF();
                    System.out.println("Received: " + receivedJson);
                } catch (EOFException e) {
                    System.out.println("Server closed the connection unexpectedly.");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

// server.Main

package server;

public class Main {

    public static void main(String[] args) {
        Server server = new Server();
        server.run();
    }
}

// server.Server

package server;

import client.Request;
import com.google.gson.Gson;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class Server {

    private static final int PORT = 23456;
    private final Gson gson;
    private final ReadWriteLock lock;
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    private volatile boolean isRunning = true;

    public Server() {
        writeDbToFile();
        this.gson = new Gson();
        this.lock = new ReentrantReadWriteLock();
    }

    @SuppressWarnings("CallToPrintStackTrace")
    public void run() {
        System.out.println("Server started!");
        while (isRunning) {
            try (ServerSocket serverSocket = new ServerSocket(PORT)) {
                try (Socket acceptSocket = serverSocket.accept();
                     DataInputStream input = new DataInputStream(acceptSocket.getInputStream());
                     DataOutputStream output = new DataOutputStream(acceptSocket.getOutputStream())
                ) {
                    String receivedJson = input.readUTF();
                    if (receivedJson.contains("exit")) {
                        executor.submit(() -> {
                           handleExitRequest(output);
                        });
                        isRunning = false;
                    } else {
                        executor.submit(() -> {
                            handleRequest(receivedJson, output);
                        });
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleExitRequest(DataOutputStream output) {
        String sentJson = gson.toJson(new Response("OK"));
        try {
            output.writeUTF(sentJson);
        } catch (IOException e) {
            throw new RuntimeException("Server side IOException");
        }
    }

    private void handleRequest(String receivedJson, DataOutputStream output) {
        Request request = gson.fromJson(receivedJson, Request.class);
        if (receivedJson.contains("get")) {
            Database db = new Database();
            Lock readLock = lock.readLock();
            String key = request.getKey();
            try {
                output.writeUTF(gson.toJson(db.getInformation(key)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            readLock.unlock();
        } else if (receivedJson.contains("set")) {
            Database db = new Database();
            Lock writeLock = lock.writeLock();
            String key = request.getKey();
            String value = request.getValue();
            try {
                output.writeUTF(gson.toJson(db.setInformation(key, value)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            writeDbToFile();
            writeLock.unlock();
        } else if (receivedJson.contains("delete")) {
            Database db = new Database();
            Lock writeLock = lock.writeLock();
            String key = request.getKey();
            try {
                output.writeUTF(gson.toJson(db.deleteInformation(key)));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
            writeDbToFile();
            writeLock.unlock();
        }
    }

    @SuppressWarnings("CallToPrintStackTrace")
    private void writeDbToFile() {
        Database db = new Database();
        try (FileWriter fileWriter = new FileWriter(System.getProperty("user.dir") + "/src/server/data/db.json", true)) {
            for (var entry : db.getDb().entrySet()) {
                fileWriter.write(entry.getKey() + ":" + entry.getValue() + "\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
// server.Database

package server;

import java.util.HashMap;
import java.util.Map;

public class Database {

    private final Map<String, String> db;
    private static final String successMessage = "OK";
    private static final String errorMessage = "ERROR";
    private static final String noSuchKeyMessage = "No such key";

    public Database() {
        this.db = new HashMap<>(1000);
    }

    public Map<String, String> getDb() {
        return db;
    }

    public Response getInformation(String key) {
        return db.containsKey(key) ? new Response(successMessage, db.get(key), false) :
                new Response(errorMessage, noSuchKeyMessage);
    }

    public Response setInformation(String key, String value) {
        db.put(key, value);
        return new Response(successMessage);
    }

    public Response deleteInformation(String key) {
        if (db.containsKey(key)) {
            db.remove(key);
            return new Response(successMessage);
        } else {
            return new Response(errorMessage, noSuchKeyMessage);
        }
    }
}
// server.Response

package server;

public class Response {

    private final String response;
    private final String reason;
    private final String value;

    public Response(String response) {
        this.response = response;
        this.reason = null;
        this.value = null;
    }

    public Response(String response, String reason) {
        this.response = response;
        this.reason = reason;
        this.value = null;
    }

    public Response(String response, String value, boolean isReason) {
        this.response = response;
        this.value = value;
        this.reason = null;
    }
}

测试结果:

Start test 1
Server started!
Client started!
Sent: {"type":"exit"}
Server closed the connection unexpectedly.

Start test 2

Start test 3
Server started!
Client started!
Sent: {"type":"get","key":"1"}
Server closed the connection unexpectedly.

基于自动化测试,我在 catch 块中放置了一些打印语句来查看问题到底是什么,并发现客户端无法读取从服务器发送的响应。我认为这可能是由于其中一个线程在发送响应之前关闭服务器引起的,但由于我对多线程的经验几乎为零,所以我不太确定。

程序在读取和写入文件方面似乎没有任何问题。老实说,我不太确定此时我是否正确构建了多线程行为。我也不太确定这里是否使用锁。

我也尝试在互联网上找到类似的问题,并根据这里和那里的建议稍微改变我的源代码,但无济于事。

请原谅任何潜在的设计和编程问题,因为我今年夏天才开始学习编程。非常感谢任何帮助。

java multithreading locking executorservice threadpoolexecutor
1个回答
0
投票

我能够通过在接受客户端时删除 try-with-resources 来解决这个问题。我在这里读过它:使用套接字和线程时的 EOFException / SocketException(1337joe 的答案)。

我删除了 try-with-resources,我的代码立即通过了测试。我还在创建 ServerSocket 对象后移动了 while 循环,以确保一台服务器接受多个客户端,而不是尝试创建多个服务器(这也可能会产生问题)。以下是未来的变化:

// I used try-with-resources here in the first implementation
while (isRunning) {
    try (ServerSocket serverSocket = new ServerSocket(PORT)) {
        try (Socket acceptSocket = serverSocket.accept();
             DataInputStream input = new DataInputStream(acceptSocket.getInputStream());
             DataOutputStream output = new DataOutputStream(acceptSocket.getOutputStream()))
             // ... rest of the code

// I removed try-with-resources and changed it as follows
    try (ServerSocket serverSocket = new ServerSocket(PORT)) {
        while (isRunning) {
            try {
                Socket acceptSocket = serverSocket.accept();
                DataInputStream input = new DataInputStream(acceptSocket.getInputStream());
                DataOutputStream output = new DataOutputStream(acceptSocket.getOutputStream());
                // ... rest of the code
© www.soinside.com 2019 - 2024. All rights reserved.