检索Akka actor或如果它不存在则创建它

问题描述 投票:4回答:3

我正在开发一个应用程序,它创建一些Akka actor来管理和处理来自Kafka主题的消息。具有相同键的消息由同一个actor处理。我也使用消息键来命名相应的actor。

当从主题中读取新消息时,我不知道具有等于消息密钥的id的actor是否已经由actor系统创建。因此,我尝试使用其名称解析actor,如果它还不存在,我创建它。我需要管理关于actor解析的并发性。因此,如果存在actor,则可能有多个客户端询问actor系统。

我现在使用的代码如下:

private CompletableFuture<ActorRef> getActor(String uuid) {
    return system.actorSelection(String.format("/user/%s", uuid))
                 .resolveOne(Duration.ofMillis(1000))
                 .toCompletableFuture()
                 .exceptionally(ex -> 
                     system.actorOf(Props.create(MyActor.class, uuid), uuid))
                 .exceptionally(ex -> {
                     try {
                         return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
                     } catch (InterruptedException | ExecutionException e) {
                         throw new RuntimeException(e);
                     }
                 });
}

上述代码未经过优化,可以更好地处理异常。

但是,在Akka中是否有一种更惯用的解决方法,或者如果它不存在则创建它?我错过了什么吗?

java scala akka actor
3个回答
4
投票

考虑创建一个actor,它将状态保存为ActorRefs的消息ID映射。这个“接待员”演员将处理获得消息处理演员的所有请求。当接待员收到对演员的请求(请求将包括消息ID)时,它会尝试在其地图中查找关联的演员:如果找到这样的演员,则将ActorRef返回给发件人;否则它会创建一个新的处理actor,将该actor添加到其地图中,并将该actor引用返回给发送者。


1
投票

杰弗里钟的回答确实是阿卡的方式。这种方法的缺点是性能低下。最高效的解决方案是使用Java的ConcurrentHashMap.computeIfAbsent()方法。


1
投票

我会考虑使用akka-clusterakka-cluster-sharding。首先,这为您提供了吞吐量以及可靠性。但是,它还将使系统管理“实体”角色的创建。

但你必须改变与那些演员交谈的方式。你创建了一个处理所有消息的ShardRegion actor:

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;


public class MyEventReceiver extends AbstractActor {

    private final ActorRef shardRegion;

    public static Props props() {
        return Props.create(MyEventReceiver.class, MyEventReceiver::new);
    }

    static ShardRegion.MessageExtractor messageExtractor 
      = new ShardRegion.HashCodeMessageExtractor(100) {
            // using the supplied hash code extractor to shard
            // the actors based on the hashcode of the entityid

        @Override
        public String entityId(Object message) {
            if (message instanceof EventInput) {
                return ((EventInput) message).uuid().toString();
            }
            return null;
        }

        @Override
        public Object entityMessage(Object message) {
            if (message instanceof EventInput) {
                return message;
            }
            return message; // I don't know why they do this it's in the sample
        }
    };


    public MyEventReceiver() {
        ActorSystem system = getContext().getSystem();
        ClusterShardingSettings settings =
           ClusterShardingSettings.create(system);
        // this is setup for the money shot
        shardRegion = ClusterSharding.get(system)
                .start("EventShardingSytem",
                        Props.create(EventActor.class),
                        settings,
                        messageExtractor);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(
                EventInput.class,
                e -> {
                    log.info("Got an event with UUID {} forwarding ... ",
                            e.uuid());
                    // the money shot
                    deviceRegion.tell(e, getSender());
                }
        ).build();
    }
}

所以这个Actor MyEventReceiver在你的集群的所有节点上运行,并封装了shardRegion Actor。您不再直接向EventActors发送消息,但是,使用MyEventReceiverdeviceRegion Actors,您可以使用分片系统跟踪特定EventActor所在群集中的哪个节点。如果之前没有创建过,它将创建一个,或者如果有消息则将其路由。每个EventActor必须有一个唯一的id:从消息中提取(所以UUID非常适合它,但它可能是一些其他id,如customerID,或orderID,或者其他什么,只要它的独特之处你想用它来处理它的Actor实例。

(我省略了EventActor代码,否则它是一个非常普通的Actor,取决于你正在做什么,'魔术'在上面的代码中)。

基于您选择的算法,分片系统自动知道创建EventActor并将其分配到分片(在这种特殊情况下,它基于唯一ID的hashCode,这是我曾经使用过的)。此外,对于任何给定的唯一ID,您只能保证一个Actor。消息透明地路由到正确的节点和碎片,无论它在哪里;从发送的Node和Shard中选择。

Akka网站和文档中有更多信息和示例代码。

这是一种非常好的方法,可以确保相同的实体/ Actor始终处理对其有意义的消息。集群和分片需要自动处理正确分配Actors和故障转移等(如果Actor有一堆与之关联的严格状态(必须恢复),你必须添加akka-persistence以获得钝化,补液和故障转移))。

© www.soinside.com 2019 - 2024. All rights reserved.