ZeroMQ DEALER 在 DEALER/ROUTER 配置中未收到来自 ROUTER 的响应

问题描述 投票:0回答:1

我正在尝试使用 ZeroMQ 在 Java 中实现分布式系统。我创建了一个 ServerNode 和一个 Broker 类。 ServerNode的相关部分如下:

public class ServerNode {
    ...
    private final ZMQ.Socket dealerSocket;
    private final ConsistentHashing consistentHashing;
    ...

    public ServerNode(String serverAddress, String brokerAddress) {
        this.serverAddress = serverAddress;
        this.brokerAddress = brokerAddress;

        this.context = new ZContext();

        this.dealerSocket = context.createSocket(SocketType.DEALER);
        this.dealerSocket.bind(serverAddress.substring(0, serverAddress.length() - 1) + "1"); //binds to port 5001
        ...
    }

    public void start() {
        joinRing();
        executorService.submit(this::checkNeighbourHeatbeat);
        executorService.submit(this::listenForMessages);

    }

    private void joinRing() {
        ZMsg msg = new ZMsg();
        msg.addString("JOIN");
        msg.addString(this.serverAddress);
        
        ZMQ.Socket brokerSocket = context.createSocket(SocketType.DEALER);
        brokerSocket.connect(brokerAddress);
        msg.send(brokerSocket);

        ZMsg response = ZMsg.recvMsg(brokerSocket); //not sure if this is correct
        System.out.println("RECEIVED RESPONSE: " + response);
        if (response != null) {
            String ringState = response.popString();
            this.ring = utils.stringToTreeMap(ringState);
            System.out.println("Received ring state: " + this.ring);
            
            ZMQ.Socket nodeDealerSocket = context.createSocket(SocketType.DEALER);
            for (String nodeAddress : this.ring.values()) {
                if (!Objects.equals(nodeAddress, this.serverAddress)) {
                    nodeDealerSocket.connect(nodeAddress.substring(0, nodeAddress.length() - 1) + "1");
                    msg.send(nodeDealerSocket);
                    nodeDealerSocket.disconnect(nodeAddress.substring(0, nodeAddress.length() - 1) + "1");
                }
            }
            nodeDealerSocket.close();
            
        }
        brokerSocket.close();
    }
    
    public static void main(String[] args) {
        String serverAddress;
        String brokerAddress = "tcp://127.0.0.1:5000";

        if (args.length > 0) {
            serverAddress = args[0];
        } else {
            System.out.println("Enter a valid server address (e.g., tcp://127.0.0.2:5000): ");
            Scanner scanner = new Scanner(System.in);
            serverAddress = scanner.nextLine();
            scanner.close();
        }

        ServerNode node = new ServerNode(serverAddress, brokerAddress);
        node.start();
    }
    ...
}

经纪人:

public class Broker {

    private ConsistentHashing consistentHashing;
    private ZContext context;
    private ZMQ.Socket serverRoutSocket; // REP socket for "JOIN" requests
    private TreeMap<Integer, String> ring;

    public Broker() throws NoSuchAlgorithmException {
        this.consistentHashing = new ConsistentHashing(1);
        this.context = new ZContext();

        this.serverRoutSocket = context.createSocket(SocketType.ROUTER);
        this.serverRoutSocket.bind("tcp://127.0.0.1:5000");

        this.ring = new TreeMap<>();
    }

    public void start() {
        System.out.println("BROKER STARTED");
        Poller poller = context.createPoller(1); //! LATEr ADD ROUTER CLIENT
        poller.register(serverRoutSocket, Poller.POLLIN);

        while (!Thread.currentThread().isInterrupted()) {
            poller.poll();

            if (poller.pollin(0)) { 
                ZMsg serverRoutReq = ZMsg.recvMsg(serverRoutSocket);
                if (serverRoutReq != null) {
                    handleServerRoutRequest(serverRoutReq);
                }
            }
        }
    }

    private void handleServerRoutRequest(ZMsg request) {
        System.out.println("REQUEST: " + request);
        String dealerIdentity = request.popString();
        String header = request.popString();
        String server = null;
        ZMsg response = new ZMsg();
        System.out.println("identity: " + dealerIdentity);
        response.addString(dealerIdentity);

        switch (header) {
            case "JOIN" -> {
                server = request.popString();
                consistentHashing.addServer(server, ring);
                response.addString(ring.toString());
                System.out.println("RESPONSE SENT: " + response);
                response.send(serverRoutSocket);
            }
            case "REMOVE" -> {
                System.out.println("SERVER REMOVED: " + ring);
                server = request.popString();
                consistentHashing.removeServer(server, ring);
            }
            default -> {
            }
            //ignore
        }
    }


    public static void main(String[] args) {
        try {
            Broker broker = new Broker();
            broker.start();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
    }
}

正如您在 ServerNode 类的 joinRing 方法中看到的,我将 DEALER 套接字连接到 Broker 的 ROUTER 套接字。然后,我发送一条由“JOIN”组成的消息,然后发送服务器地址,该消息随后由代理接收并在 handleServerRoutRequest 方法中进行相应处理。然后,代理应使用当前环状态响应服务器,服务器将使用该状态来查找系统中存在的所有节点。之后,服务器只需向每个节点发送一条加入消息,以便它们可以更新自己的环。

将加入请求发送到代理后,我预计代理会返回环,并且消息似乎是使用身份正确创建的,后跟负载(例如:[ 00C1D76C28,{1114376065=tcp://127.0.0.0])。 0.2:5000} ]),服务器卡在

ZMsg response = ZMsg.recvMsg(brokerSocket);
线上。为什么没有收到回复?

java sockets zeromq distributed-system
1个回答
0
投票

Java的0MQ接口并不是最好的。我使用过 C++ 和 Python 的接口,效果很好。 另外,看看你的代码,我可以看出它非常干净,一点也不复杂,这让我认为这不是你的错。 您是否尝试过联系 0MQ 的支持人员?我听说他们有一个非常好的热线电话,您可以在工作时间拨打,他们也许可以帮助您解决问题。 如果这不起作用,我建议您在使用 java 进行任何与套接字相关的工作时使用 java.nio 。 顺便问一下,这里选择 Java 有什么特别的原因吗?这就像搬起石头砸自己的脚,因为有很多更好的解决方案,例如 PHP、Perl 或 VBA。

© www.soinside.com 2019 - 2024. All rights reserved.