无法从vertx集群中的ServiceDiscovery获取记录

问题描述 投票:0回答:1

我的本地机器上有一个vert.x集群。有两个垂直

public class AccountVerticle extends AbstractVerticle {

    private ServiceDiscovery discovery;

    private static final Logger logger = LoggerFactory.getLogger(AccountVerticle.class);

    @Override
    public void start(Future<Void> fut) {
        Vertx vertx = Vertx.vertx();
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.get("/api/account").handler(this::get);

        discovery = ServiceDiscovery.create(vertx);
        createHttpServer(router).subscribe((server, err) -> {
            if (err != null) {
                logger.error("Error while starting server", err);
            } else {
                logger.info("Server has been successfully started");
                discovery.rxPublish(HttpEndpoint.createRecord("accounts", "localhost", 8080, "/api"))
                        .subscribe((rec,error) -> {
                            if (error != null) {
                                logger.error("Error while starting record", err);
                            } else {
                                logger.info("Record has been successfully published");
                                discovery.rxGetRecords(record -> true)
                                        .subscribe((records, throwable) -> {
                                            logger.info(records.get(0).getLocation());
                                        });
                            }
                        });
            }
        });
    }

    private Single<HttpServer> createHttpServer(Router router) {
        return vertx
                .createHttpServer()
                .requestHandler(router)
                .rxListen(8080);
    }

    private void get(RoutingContext rc) {
        Single.just(new JsonObject("{\"account\":\"test\"}")).subscribe(ActionHelper.ok(rc));
    }
}


public class CustomerVerticle extends AbstractVerticle {

    private ServiceDiscovery discovery;

    private static final Logger logger = LoggerFactory.getLogger(CustomerVerticle.class);

    @Override
    public void start(Future<Void> fut) {
        Vertx vertx = Vertx.vertx();
        Router router = Router.router(vertx);
        router.route().handler(BodyHandler.create());
        router.post("/api/customers/:id/accounts").handler(this::updateCustomerAccount);

        discovery = ServiceDiscovery.create(vertx);
        createHttpServer(router).subscribe((server, err) -> {
            if (err != null) {
                logger.error("Error while starting server", err);
            } else {
                logger.info("Server has been successfully started");
                discovery.rxGetRecords(record -> true)
                            .subscribe((records, throwable) -> {
                                logger.info(records);
                            });
            }
        });
    }

    private Single<HttpServer> createHttpServer(Router router) {
        return vertx
                .createHttpServer()
                .requestHandler(router)
                .rxListen(8090);
    }

    private void updateCustomerAccount(RoutingContext rc) {

        HttpEndpoint.rxGetWebClient(discovery, record -> {
                        return record.getName().equals("accounts");
                    })
                    .flatMap(httpClient -> {
                        return httpClient.get("/api")
                                    .as(BodyCodec.string())
                                    .rxSend();
                    }).subscribe((response, err) -> {
                        logger.info(response);
                        Single.just(new JsonObject("{\"customer\":\"test\"}")).subscribe(ActionHelper.ok(rc));
                    });
    }
}

AccountVerticle向ServiceDiscovery发布一个HttpEndpoint记录。我可以在AccountVerticle中看到记录discovery.rxGetRecords。但是当我尝试在CustomerVerticle中获取记录时,我什么也得不到

AccountVericle日志:

Connected to the target VM, address: '127.0.0.1:37153', transport: 'socket'
Mar 03, 2019 11:57:02 PM io.vertx.core.impl.launcher.commands.RunCommand
INFO: Starting clustering...
Mar 03, 2019 11:57:02 PM io.vertx.core.impl.launcher.commands.RunCommand
INFO: No cluster-host specified so using address 172.18.0.1
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Prefer IPv4 stack is true.
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Picked [192.168.0.105]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Hazelcast 3.10.5 (20180913 - 6ffa2ee) starting at [192.168.0.105]:5701
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
Mar 03, 2019 11:57:03 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Configured Hazelcast Serialization version: 1
Mar 03, 2019 11:57:03 PM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5701 [dev] [3.10.5] A non-empty group password is configured for the Hazelcast member. Starting with Hazelcast version 3.8.2, members with the same group name, but with different group passwords (that do not use authentication) form a cluster. The group password configuration will be removed completely in a future release.
Mar 03, 2019 11:57:04 PM com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Backpressure is disabled
Mar 03, 2019 11:57:04 PM com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Running with 2 response threads
Mar 03, 2019 11:57:06 PM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Creating MulticastJoiner
Mar 03, 2019 11:57:06 PM com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Starting 4 partition threads and 3 generic threads (1 dedicated for priority tasks)
Mar 03, 2019 11:57:06 PM com.hazelcast.internal.diagnostics.Diagnostics
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
Mar 03, 2019 11:57:06 PM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] [192.168.0.105]:5701 is STARTING
Mar 03, 2019 11:57:09 PM com.hazelcast.system
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Cluster version set to 3.10
Mar 03, 2019 11:57:09 PM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] 

Members {size:1, ver:1} [
    Member [192.168.0.105]:5701 - 2a3f5096-5a47-4ade-9e0c-ce1d1d3b4e0f this
]

Mar 03, 2019 11:57:09 PM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5701 [dev] [3.10.5] [192.168.0.105]:5701 is STARTED
Mar 03, 2019 11:57:10 PM com.hazelcast.internal.partition.impl.PartitionStateManager
INFO: [192.168.0.105]:5701 [dev] [3.10.5] Initializing cluster partition table arrangement...
Mar 03, 2019 11:57:10 PM io.vertx.core.impl.VertxImpl
WARNING: You're already on a Vert.x context, are you sure you want to create a new Vertx instance?
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: Server has been successfully started
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: Record has been successfully published
Mar 03, 2019 11:57:10 PM ua.home.accounts.AccountVerticle
INFO: {"endpoint":"http://localhost:8080/api","host":"localhost","port":8080,"root":"/api","ssl":false}

CustomerVerticle日志

Connected to the target VM, address: '127.0.0.1:36463', transport: 'socket'
Mar 04, 2019 12:04:41 AM io.vertx.core.impl.launcher.commands.RunCommand
INFO: Starting clustering...
Mar 04, 2019 12:04:41 AM io.vertx.core.impl.launcher.commands.RunCommand
INFO: No cluster-host specified so using address 172.18.0.1
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Prefer IPv4 stack is true.
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.AddressPicker
INFO: [LOCAL] [dev] [3.10.5] Picked [192.168.0.105]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Hazelcast 3.10.5 (20180913 - 6ffa2ee) starting at [192.168.0.105]:5702
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
Mar 04, 2019 12:04:43 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Configured Hazelcast Serialization version: 1
Mar 04, 2019 12:04:43 AM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5702 [dev] [3.10.5] A non-empty group password is configured for the Hazelcast member. Starting with Hazelcast version 3.8.2, members with the same group name, but with different group passwords (that do not use authentication) form a cluster. The group password configuration will be removed completely in a future release.
Mar 04, 2019 12:04:43 AM com.hazelcast.spi.impl.operationservice.impl.BackpressureRegulator
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Backpressure is disabled
Mar 04, 2019 12:04:43 AM com.hazelcast.spi.impl.operationservice.impl.InboundResponseHandlerSupplier
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Running with 2 response threads
Mar 04, 2019 12:04:44 AM com.hazelcast.instance.Node
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Creating MulticastJoiner
Mar 04, 2019 12:04:45 AM com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Starting 4 partition threads and 3 generic threads (1 dedicated for priority tasks)
Mar 04, 2019 12:04:45 AM com.hazelcast.internal.diagnostics.Diagnostics
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
Mar 04, 2019 12:04:45 AM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] [192.168.0.105]:5702 is STARTING
Mar 04, 2019 12:04:45 AM com.hazelcast.internal.cluster.impl.MulticastJoiner
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Trying to join to discovered node: [192.168.0.105]:5701
Mar 04, 2019 12:04:45 AM com.hazelcast.nio.tcp.TcpIpConnector
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Connecting to /192.168.0.105:5701, timeout: 0, bind-any: true
Mar 04, 2019 12:04:45 AM com.hazelcast.nio.tcp.TcpIpConnectionManager
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Established socket connection between /192.168.0.105:58085 and /192.168.0.105:5701
Mar 04, 2019 12:04:51 AM com.hazelcast.system
INFO: [192.168.0.105]:5702 [dev] [3.10.5] Cluster version set to 3.10
Mar 04, 2019 12:04:51 AM com.hazelcast.internal.cluster.ClusterService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] 

Members {size:2, ver:2} [
    Member [192.168.0.105]:5701 - 2a3f5096-5a47-4ade-9e0c-ce1d1d3b4e0f
    Member [192.168.0.105]:5702 - 8f030de2-dc19-4f02-96db-abcb53114bad this
]

Mar 04, 2019 12:04:52 AM com.hazelcast.core.LifecycleService
INFO: [192.168.0.105]:5702 [dev] [3.10.5] [192.168.0.105]:5702 is STARTED
Mar 04, 2019 12:04:53 AM io.vertx.core.impl.VertxImpl
WARNING: You're already on a Vert.x context, are you sure you want to create a new Vertx instance?
Mar 04, 2019 12:04:53 AM ua.home.customers.CustomerVerticle
INFO: Server has been successfully started
Mar 04, 2019 12:04:53 AM ua.home.customers.CustomerVerticle
INFO: []

这是一个java项目https://github.com/b3lowster/samples_for_blog/tree/master/rx_sd_java_vertx

更新:我可以在服务发现中发布记录并在hazelcast集群中的另一个Verticle中使用此记录吗?

rx-java vert.x service-discovery
1个回答
0
投票

我找到了一个线索只是用-cluster参数运行Verticle类是不够的。必须初始化HazelcastClusterManager

请注意我下面的例子

@Override
public void start(Future<Void> future) throws Exception {
    Config hazelcastConfig = ConfigUtil.loadConfig();
    hazelcastConfig.getGroupConfig()
            .setName("tsv-cluster");
    ClusterManager mgr = new HazelcastClusterManager(hazelcastConfig);
    VertxOptions options = new VertxOptions().setClusterManager(mgr);
    Vertx.rxClusteredVertx(options).subscribe(vertx -> {
        // TODO
    });
}
© www.soinside.com 2019 - 2024. All rights reserved.