如何在JGROUPS中禁用FIFO和重新传输协议?

问题描述 投票:1回答:1

我是Jgroups的新手,但基于我对文档的理解,它的一个主要优点是可以禁用一个不需要/想要的协议元素(以获得更好的性能)。但是,当我试图禁用任何与“FIFO”交付顺序和“保证交付”有关的内容时,我收到以下错误:

Exception in thread "main" java.lang.Exception: events [GET_DIGEST SET_DIGEST ] are required by GMS, but not provided by any of the protocols below it
    at org.jgroups.stack.Configurator.sanityCheck(Configurator.java:320)
    at org.jgroups.stack.Configurator.connectProtocols(Configurator.java:197)
    at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:115)
    at org.jgroups.stack.Configurator.setupProtocolStack(Configurator.java:49)
    at org.jgroups.stack.ProtocolStack.setup(ProtocolStack.java:475)
    at org.jgroups.JChannel.init(JChannel.java:965)
    at org.jgroups.JChannel.<init>(JChannel.java:148)
    at org.jgroups.JChannel.<init>(JChannel.java:130)
    at RpcDispatcherTest.start(RpcDispatcherTest.java:29)
    at RpcDispatcherTest.main(RpcDispatcherTest.java:83)

我的xml配置文件如下所示:

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns="urn:org:jgroups"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd">
    <TCP bind_addr="127.0.0.1"
            bind_port="7800"
         recv_buf_size="${tcp.recv_buf_size:130k}"
         send_buf_size="${tcp.send_buf_size:130k}"
         max_bundle_size="64K"
         sock_conn_timeout="300"
         enable_diagnostics="true"
         thread_pool.min_threads="10"
         thread_pool.max_threads="20"
         thread_pool.keep_alive_time="30000"
         stats = "false"
    />
    <TCPPING initial_hosts="127.0.0.1[7800]"
             port_range="0" stats = "false"/>
    <MERGE3  min_interval="10000"
             max_interval="30000" stats = "false"/>
    <FD_SOCK stats = "false"/>
    <FD timeout="3000" max_tries="3" stats = "false" />
    <VERIFY_SUSPECT timeout="1500" stats = "false" />

    <pbcast.GMS print_local_addr="true" join_timeout="2000"
                view_bundling="true" stats = "false"/>
</config>

如果我注释掉最后一个协议(pgcast.GMS一个),我不会得到错误并且它“似乎”可以在单个Windows VM上工作(在Google Cloud上),但如果我启动第二个jvm(仍然在相同的Windows机器),然后我注意到每个jvm都在一个“独立”的集群中,并没有看到另一个。 (在“普通tcp.xml”配置中(包括NACKA和XXXX协议),例如

<pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"
                    stats = "false"/>
    <UNICAST3 stats = "false"/>
    <!--<pbcast.STABLE desired_avg_gossip="50000"-->
                   <!--max_bytes="4M"/>-->

一切都按预期工作,即如果我在同一个Windows机器上启动第二个JVM,第二个JVM似乎加入了第一个JVM的集群,因此第二个JVM上发送的消息出现在第一个JVM中,反之亦然。

那么,有没有办法禁用UNICAST3和NAKACK2(基本上,任何与FIFO排序或保证消息传递有关的事情),但仍然包括确保“工作完整集群”所需的逻辑,该逻辑还捕获哪些节点离开/加入集群(例如pbcast.GMS逻辑?)我无法弄清楚如何......

(背景信息:我正在努力提高性能,我怀疑性能稍慢是因为“保证消息传递”和“FIFO”协议,我认为我不需要因为a)我正在使用TCP和b)消息可以按任何顺序发送。 (也就是说,我假设TCP,几乎按照定义,确保消息传递,因为这是至关重要的。)我也在谷歌云上,我认为TCP逻辑的“保证”方面运行在高度优化无论如何都不允许使用路由器和多播,这抑制了UDP组播的一个主要优点。)

最后(我不认为这是必需的),但这是我的测试代码(这只是对JGroups 4.0附带的演示的一点修改):

import org.jgroups.Address;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.blocks.*;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class RpcDispatcherTest {
    JChannel channel;
    RpcDispatcher disp;
    RspList rsp_list;
    String             props = "gs-tcp.xml"; // set by application

    public static int print(int number) throws Exception {
        return number;
    }

    public void start() throws Exception {

        RequestOptions opts=new RequestOptions(ResponseMode.GET_FIRST, 1000);
        channel=new JChannel(props);
        disp=new RpcDispatcher(channel, this);
        channel.connect("RpcDispatcherTestGroup");

        final Address myCurAddress = channel.getAddress();
        System.out.println("Currrent address is " + myCurAddress + " all members address are " + channel.getView().getMembers().toString());


        final long t1 = System.currentTimeMillis();
        final IntStream x = IntStream.range(0, 1_000_000);
        final AtomicInteger cnt = new AtomicInteger();
        x.asLongStream().parallel().forEach(l -> {
            try {
            final int i = (int) l;
                if (i % (100) == 0) {
                    System.out.println("At " + i + " on thread  + " + Thread.currentThread().getId());
                }


            final MethodCall call=new MethodCall(getClass().getMethod("print", int.class));
            call.setArgs(i);
            final CompletableFuture<Integer> response = disp.<Integer>callRemoteMethodWithFuture(myCurAddress, call, opts);
            response.thenAccept(integer -> {
                    if (integer % (1024*8) == 0) {
                        System.out.println("At " + cnt.incrementAndGet() + " Execution time for " + integer + " is " + (System.currentTimeMillis() - t1)/1000f);
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        });

     //   Util.close(disp, channel);
    }

    public static void main(String[] args) throws Exception {
        new RpcDispatcherTest().start();
    }
}
java jgroups
1个回答
0
投票

我没有找到一种方法来禁用所有可靠的消息传输协议,因为至少GMS协议依赖于NAKACK2GMS要求上述协议提供由GET_DIGEST提供的NAKACK2事件。

但删除UNICAST3协议确实有很大帮助,现在性能要好得多。

© www.soinside.com 2019 - 2024. All rights reserved.