Akka集群收不到消息

问题描述 投票:0回答:1

我正在尝试使用“集群中的经典分布式发布订阅”的概念从微服务 A 内的 senderActor 发送消息我正在尝试将消息发布到内容主题,但我没有在 ReceiverActor 中接收它位于微服务 B 中。您是否了解可能导致此问题的原因?

谢谢!

A 中的 Conf 文件

akka {
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      enabled = on
      canonical {
        hostname = "localhost"
        port = 2551
      }
    }
  }
  cluster {
    seed-nodes = [
      "akka://cluster-system@localhost:2551"
    ]
  }
}

发送者演员:

public class SenderActor extends AbstractActor {
    ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(
                        String.class,
                        in -> {
                            String out = in.toUpperCase();
                            mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
                        })
                .build();
    }
}

微服务A中的main:

    public static void main(String[] args) {
        SpringApplication.run(MicroserviceAaApplication.class, args);
        Config config = ConfigFactory.load("application.conf");

        ActorSystem system = ActorSystem.create("cluster-system", config);
        Cluster cluster = Cluster.get(system);
        ActorRef publisher = system.actorOf(Props.create(SenderActor.class), "publisher");
        publisher.tell("hello from microservice A ", null);
        
    }

B 中的 Conf 文件

akka {
  actor {
    provider = "cluster"
  }
  remote {
    artery {
      enabled = on
      canonical {
        hostname = "localhost"
        port = 2552
      }
    }
  }
  cluster {
    seed-nodes = [
      "akka://cluster-system@localhost:2551"
    ]
  }
}

ReceiverActor 类:

public class ReceiverActor extends AbstractActor {
    LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    public ReceiverActor() {
        ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
        mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());
    }
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(String.class, msg -> {
                    log.info("Received message in receiver-actor-b: {}", msg);
                })
                .match(DistributedPubSubMediator.SubscribeAck.class,
                        msg -> log.info("SubscribeAck: {}", msg))
                .build();
    }
}

微服务B中的主要:

    public static void main(String[] args) {
        SpringApplication.run(MicroserviceBbApplication.class, args);
        Config config = ConfigFactory.load("application.conf");

        ActorSystem system = ActorSystem.create("cluster-system", config);
        Cluster cluster = Cluster.get(system);
        ActorRef subscriber = system.actorOf(Props.create(ReceiverActor.class), "subscriber");
    }

尝试获取消息“来自微服务 A 的问候”,但我只得到 B 中的流动日志:

2023-08-20T15:53:14.067+02:00  INFO 7788 --- [           main] c.e.M.MicroserviceBbApplication          : Started MicroserviceBbApplication in 0.909 seconds (process running for 1.723)
2023-08-20T15:53:14.552+02:00  INFO 7788 --- [lt-dispatcher-5] akka.event.slf4j.Slf4jLogger             : Slf4jLogger started
2023-08-20T15:53:14.900+02:00  INFO 7788 --- [lt-dispatcher-5] akka.remote.artery.ArteryTransport       : Remoting started with transport [Artery tcp]; listening on address [akka://cluster-system@localhost:2552] with UID [6730186213375081245]
2023-08-20T15:53:14.925+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Starting up, Akka version [2.8.3] ...
2023-08-20T15:53:14.980+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Registered cluster JMX MBean [akka:type=Cluster]
2023-08-20T15:53:14.981+02:00  INFO 7788 --- [lt-dispatcher-5] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Started up successfully
2023-08-20T15:53:15.006+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - No downing-provider-class configured, manual cluster downing required, see https://doc.akka.io/docs/akka/current/typed/cluster.html#downing
2023-08-20T15:53:15.624+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Received InitJoinAck message from [Actor[akka://cluster-system@localhost:2551/system/cluster/core/daemon#1745370250]] to [akka://cluster-system@localhost:2552]
2023-08-20T15:53:15.675+02:00  INFO 7788 --- [t-dispatcher-13] akka.cluster.Cluster                     : Cluster Node [akka://cluster-system@localhost:2552] - Welcome from [akka://cluster-system@localhost:2551]
2023-08-20T15:53:16.077+02:00  INFO 7788 --- [lt-dispatcher-5] c.example.MicroserviceBB.ReceiverActor   : SubscribeAck: SubscribeAck(Subscribe(content,None,Actor[akka://cluster-system/user/subscriber1#-339648796]))
2023-08-20T16:24:19.549+02:00  INFO 7788 --- [t-dispatcher-40] akka.actor.CoordinatedShutdown           : Running CoordinatedShutdown with reason [JvmExitReason]

Process finished with exit code 130

akka akka-cluster distributedpubsub
1个回答
0
投票

重要的是要记住,分布式 PubSub 只会向中介者收到已发布消息时恰好订阅的订阅者传递消息(我稍微简化了一点,但这是直觉)。

在微服务 A 的

main
中,您开始集群形成(形成集群的过程在后台发生),然后立即生成
SenderActor
并向其发送一条消息,该消息将转发给中介者。另请注意,集群可以仅由一个节点组成(因为微服务 A 将自身定义为种子节点),因此您在微服务 B 中的
ReceiverActor
订阅之前将消息发送到中介器;结果,该消息被丢弃。

在这种情况下,在微服务 A 中拥有一个订阅不同 pubsub 主题的参与者可能会很有用;

ReceiverActor
发布到它已订阅
"content"
主题的主题,这会导致微服务 A 中的订阅参与者
tell
SenderActor
它可以向该主题发布消息。

附带说明一下,目前尚不清楚微服务 A 和 B 是同一微服务的实例还是真正不同的微服务。如果是后者,不建议使用 Akka Cluster 进行微服务之间的通信

© www.soinside.com 2019 - 2024. All rights reserved.