我正在开发一个应用程序,它创建一些Akka actor来管理和处理来自Kafka主题的消息。具有相同键的消息由同一个actor处理。我也使用消息键来命名相应的actor。
当从主题中读取新消息时,我不知道具有等于消息密钥的id的actor是否已经由actor系统创建。因此,我尝试使用其名称解析actor,如果它还不存在,我创建它。我需要管理关于actor解析的并发性。因此,如果存在actor,则可能有多个客户端询问actor系统。
我现在使用的代码如下:
private CompletableFuture<ActorRef> getActor(String uuid) {
return system.actorSelection(String.format("/user/%s", uuid))
.resolveOne(Duration.ofMillis(1000))
.toCompletableFuture()
.exceptionally(ex ->
system.actorOf(Props.create(MyActor.class, uuid), uuid))
.exceptionally(ex -> {
try {
return system.actorSelection(String.format("/user/%s",uuid)).resolveOne(Duration.ofMillis(1000)).toCompletableFuture().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}
上述代码未经过优化,可以更好地处理异常。
但是,在Akka中是否有一种更惯用的解决方法,或者如果它不存在则创建它?我错过了什么吗?
考虑创建一个actor,它将状态保存为ActorRef
s的消息ID映射。这个“接待员”演员将处理获得消息处理演员的所有请求。当接待员收到对演员的请求(请求将包括消息ID)时,它会尝试在其地图中查找关联的演员:如果找到这样的演员,则将ActorRef
返回给发件人;否则它会创建一个新的处理actor,将该actor添加到其地图中,并将该actor引用返回给发送者。
杰弗里钟的回答确实是阿卡的方式。这种方法的缺点是性能低下。最高效的解决方案是使用Java的ConcurrentHashMap.computeIfAbsent()方法。
我会考虑使用akka-cluster
和akka-cluster-sharding
。首先,这为您提供了吞吐量以及可靠性。但是,它还将使系统管理“实体”角色的创建。
但你必须改变与那些演员交谈的方式。你创建了一个处理所有消息的ShardRegion
actor:
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class MyEventReceiver extends AbstractActor {
private final ActorRef shardRegion;
public static Props props() {
return Props.create(MyEventReceiver.class, MyEventReceiver::new);
}
static ShardRegion.MessageExtractor messageExtractor
= new ShardRegion.HashCodeMessageExtractor(100) {
// using the supplied hash code extractor to shard
// the actors based on the hashcode of the entityid
@Override
public String entityId(Object message) {
if (message instanceof EventInput) {
return ((EventInput) message).uuid().toString();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof EventInput) {
return message;
}
return message; // I don't know why they do this it's in the sample
}
};
public MyEventReceiver() {
ActorSystem system = getContext().getSystem();
ClusterShardingSettings settings =
ClusterShardingSettings.create(system);
// this is setup for the money shot
shardRegion = ClusterSharding.get(system)
.start("EventShardingSytem",
Props.create(EventActor.class),
settings,
messageExtractor);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(
EventInput.class,
e -> {
log.info("Got an event with UUID {} forwarding ... ",
e.uuid());
// the money shot
deviceRegion.tell(e, getSender());
}
).build();
}
}
所以这个Actor MyEventReceiver
在你的集群的所有节点上运行,并封装了shardRegion
Actor。您不再直接向EventActor
s发送消息,但是,使用MyEventReceiver
和deviceRegion
Actors,您可以使用分片系统跟踪特定EventActor
所在群集中的哪个节点。如果之前没有创建过,它将创建一个,或者如果有消息则将其路由。每个EventActor
必须有一个唯一的id:从消息中提取(所以UUID
非常适合它,但它可能是一些其他id,如customerID,或orderID,或者其他什么,只要它的独特之处你想用它来处理它的Actor实例。
(我省略了EventActor
代码,否则它是一个非常普通的Actor,取决于你正在做什么,'魔术'在上面的代码中)。
基于您选择的算法,分片系统自动知道创建EventActor
并将其分配到分片(在这种特殊情况下,它基于唯一ID的hashCode
,这是我曾经使用过的)。此外,对于任何给定的唯一ID,您只能保证一个Actor。消息透明地路由到正确的节点和碎片,无论它在哪里;从发送的Node和Shard中选择。
Akka网站和文档中有更多信息和示例代码。
这是一种非常好的方法,可以确保相同的实体/ Actor始终处理对其有意义的消息。集群和分片需要自动处理正确分配Actors和故障转移等(如果Actor有一堆与之关联的严格状态(必须恢复),你必须添加akka-persistence
以获得钝化,补液和故障转移))。