我正在寻找一种方法来强制 JGroups 使用特定服务器作为协调器,如果该服务器不存在,则选择一个新的协调器,直到指定的协调器重新加入集群并接管协调器。
在这种情况下,我们通过协调器监听更新主题将一些信息推送到集群中,但是获取和处理这些更新可能会占用大量资源,因此我们不希望它向外界提供任何服务。因此,在集群前面的负载均衡器中,我们将其设置为不发送到协调器。但是因为协调器是随机选举的,所以我们基本上需要关闭集群,直到只有一台机器在其中,然后重新启动集群的其余部分。
目前还没有办法做到这一点。 Jgroups 花费了大量时间来确保协调器可以是组中的任何节点。维护和监控组成员列表健康状况的所有任务都由组中的所有成员共享,以确保协调员职责不会过多影响协调员的绩效。标准 GMS(Group MembershipService)协议栈类负责协调器选择。目前它只是查看列表中的第一个主机。
要实现此行为,您必须实现自己的协议栈。然而,其他人可能已经对这个问题采取了行动。我建议在 jgroups 邮件列表 上发帖并提出同样的问题。
刚刚偶然发现了这篇文章。在 JGroups 中有一种简单且标准的方法可以做到这一点:[1]。它本质上是让用户代码控制视图生成。
[1] http://www.jgroups.org/manual4/index.html#MembershipChangePolicy
您可以将所需的节点设置为协调器。 : github 示例
我添加同步块以在所有节点完成更改并完成代码:
public static final String GMS_DELTA_VIEW_FIELD_NAME = "use_delta_views";
/**
* Change coordinator to {@code desiredCoordinator}. Must be invoked from coordinator.
* @param desiredCoordinator
* @return {@code true} if changes success, {@code false} overwise
*/
boolean changeCoordinator(JChannel currentChannel, Address desiredCoordinator) {
if(!Util.isCoordinator(currentChannel.getAddress)) {
throw new RuntimeException("The current node is not coordinator.");
}
ArrayList<Address> newMembersOrder = Lists.newArrayList(currentView.getMembers());
// Switch desired node to first place
Collections.swap(newMembersOrder, 0, newMembersOrder.indexOf(desiredCoordinator));
// Create new view
long newId = currentView.getViewId().getId() + 1;
View newView = new View(newMembersOrder.get(0), newId, newMembersOrder);
GMS gms = (GMS)clusterChannel.getProtocolStack().findProtocol(GMS.class);
CustomProtocol protocol = new CustomProtocol(newMembersOrder.stream()
.filter(item -> !item.equals(currentChannel.getAddress()))
.collect(Collectors.toSet()));
boolean oldUseDeltaViews = (Boolean)gms.getValue(GMS_DELTA_VIEW_FIELD_NAME);
try {
// Disable using_delta_views at GMS
gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, false);
// Insert custom protocol below GMS for synchronizing with VIEW_ACK events
currentChannel.getProtocolStack().insertProtocolInStack(protocol, gms, ProtocolStack.BELOW);
gms.castViewChange(newView, null, newMembersOrder);
// Wait no more than 30 seconds to all VIEW_ACK responses
if (!protocol.collector.waitForAllAcks(TimeUnit.SECONDS.toMillis(30))) {
return false;
}
return true;
}
finally {
// Repair old state
gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, oldUseDeltaViews);
currentChannel.getProtocolStack().removeProtocol(protocol);
}
}
private class CustomProtocol extends Protocol implements UpHandler {
AckCollector collector;
public CustomProtocol(Collection<Address> waitedAddresses) {
collector = new AckCollector(waitedAddresses);
}
@Override
public Object up(Event evt) {
if(evt.getType() == Event.MSG) {
final Message msg=(Message)evt.getArg();
GmsHeader hdr=(GmsHeader)msg.getHeader(proto_id);
if(hdr != null && hdr.getType() == GmsHeader.VIEW_ACK) {
collector.ack(msg.getSrc());
}
}
return super.up(evt);
}
}