我正在尝试使用 ZeroMQ 在 Java 中实现分布式系统。我创建了一个 ServerNode 和一个 Broker 类。 ServerNode的相关部分如下:
public class ServerNode {
...
private final ZMQ.Socket dealerSocket;
private final ConsistentHashing consistentHashing;
...
public ServerNode(String serverAddress, String brokerAddress) {
this.serverAddress = serverAddress;
this.brokerAddress = brokerAddress;
this.context = new ZContext();
this.dealerSocket = context.createSocket(SocketType.DEALER);
this.dealerSocket.bind(serverAddress.substring(0, serverAddress.length() - 1) + "1"); //binds to port 5001
...
}
public void start() {
joinRing();
executorService.submit(this::checkNeighbourHeatbeat);
executorService.submit(this::listenForMessages);
}
private void joinRing() {
ZMsg msg = new ZMsg();
msg.addString("JOIN");
msg.addString(this.serverAddress);
ZMQ.Socket brokerSocket = context.createSocket(SocketType.DEALER);
brokerSocket.connect(brokerAddress);
msg.send(brokerSocket);
ZMsg response = ZMsg.recvMsg(brokerSocket); //not sure if this is correct
System.out.println("RECEIVED RESPONSE: " + response);
if (response != null) {
String ringState = response.popString();
this.ring = utils.stringToTreeMap(ringState);
System.out.println("Received ring state: " + this.ring);
ZMQ.Socket nodeDealerSocket = context.createSocket(SocketType.DEALER);
for (String nodeAddress : this.ring.values()) {
if (!Objects.equals(nodeAddress, this.serverAddress)) {
nodeDealerSocket.connect(nodeAddress.substring(0, nodeAddress.length() - 1) + "1");
msg.send(nodeDealerSocket);
nodeDealerSocket.disconnect(nodeAddress.substring(0, nodeAddress.length() - 1) + "1");
}
}
nodeDealerSocket.close();
}
brokerSocket.close();
}
public static void main(String[] args) {
String serverAddress;
String brokerAddress = "tcp://127.0.0.1:5000";
if (args.length > 0) {
serverAddress = args[0];
} else {
System.out.println("Enter a valid server address (e.g., tcp://127.0.0.2:5000): ");
Scanner scanner = new Scanner(System.in);
serverAddress = scanner.nextLine();
scanner.close();
}
ServerNode node = new ServerNode(serverAddress, brokerAddress);
node.start();
}
...
}
经纪人:
public class Broker {
private ConsistentHashing consistentHashing;
private ZContext context;
private ZMQ.Socket serverRoutSocket; // REP socket for "JOIN" requests
private TreeMap<Integer, String> ring;
public Broker() throws NoSuchAlgorithmException {
this.consistentHashing = new ConsistentHashing(1);
this.context = new ZContext();
this.serverRoutSocket = context.createSocket(SocketType.ROUTER);
this.serverRoutSocket.bind("tcp://127.0.0.1:5000");
this.ring = new TreeMap<>();
}
public void start() {
System.out.println("BROKER STARTED");
Poller poller = context.createPoller(1); //! LATEr ADD ROUTER CLIENT
poller.register(serverRoutSocket, Poller.POLLIN);
while (!Thread.currentThread().isInterrupted()) {
poller.poll();
if (poller.pollin(0)) {
ZMsg serverRoutReq = ZMsg.recvMsg(serverRoutSocket);
if (serverRoutReq != null) {
handleServerRoutRequest(serverRoutReq);
}
}
}
}
private void handleServerRoutRequest(ZMsg request) {
System.out.println("REQUEST: " + request);
String dealerIdentity = request.popString();
String header = request.popString();
String server = null;
ZMsg response = new ZMsg();
System.out.println("identity: " + dealerIdentity);
response.addString(dealerIdentity);
switch (header) {
case "JOIN" -> {
server = request.popString();
consistentHashing.addServer(server, ring);
response.addString(ring.toString());
System.out.println("RESPONSE SENT: " + response);
response.send(serverRoutSocket);
}
case "REMOVE" -> {
System.out.println("SERVER REMOVED: " + ring);
server = request.popString();
consistentHashing.removeServer(server, ring);
}
default -> {
}
//ignore
}
}
public static void main(String[] args) {
try {
Broker broker = new Broker();
broker.start();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
}
正如您在 ServerNode 类的 joinRing 方法中看到的,我将 DEALER 套接字连接到 Broker 的 ROUTER 套接字。然后,我发送一条由“JOIN”组成的消息,然后发送服务器地址,该消息随后由代理接收并在 handleServerRoutRequest 方法中进行相应处理。然后,代理应使用当前环状态响应服务器,服务器将使用该状态来查找系统中存在的所有节点。之后,服务器只需向每个节点发送一条加入消息,以便它们可以更新自己的环。
将加入请求发送到代理后,我预计代理会返回环,并且消息似乎是使用身份正确创建的,后跟负载(例如:[ 00C1D76C28,{1114376065=tcp://127.0.0.0])。 0.2:5000} ]),服务器卡在
ZMsg response = ZMsg.recvMsg(brokerSocket);
线上。为什么没有收到回复?
Java的0MQ接口并不是最好的。我使用过 C++ 和 Python 的接口,效果很好。 另外,看看你的代码,我可以看出它非常干净,一点也不复杂,这让我认为这不是你的错。 您是否尝试过联系 0MQ 的支持人员?我听说他们有一个非常好的热线电话,您可以在工作时间拨打,他们也许可以帮助您解决问题。 如果这不起作用,我建议您在使用 java 进行任何与套接字相关的工作时使用 java.nio 。 顺便问一下,这里选择 Java 有什么特别的原因吗?这就像搬起石头砸自己的脚,因为有很多更好的解决方案,例如 PHP、Perl 或 VBA。