在 Akka 类型化事件溯源中,对多个类型化持久实体使用单个数据库(相同的事件日志)是否很常见?

问题描述 投票:0回答:1

假设应用程序中

office
有两个
EventSourcedBehavior
演员

trait OfficeFridgeCommand
case object OpenFridge extends OfficeFridgeCommand
case object CloseFridge extends OfficeFridgeCommand

trait OfficeFridgeEvent
case object FridgeOpened extends OfficeFridgeEvent
case object FridgeClosed extends OfficeFridgeEvent

trait OfficeCoffeeMachineCommand
case object MakeCoffee extends OfficeCoffeeMachineCommand

trait OfficeCoffeeMachineEvent
case object CoffeeMade extends OfficeCoffeeMachineEvent

val fridgeEntity = 
  EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge]()

val frontDoorEntity = 
  EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge]()

val coffeeMachineEntity = 
  EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge]()

假设冰箱发生了一些操作,并且冰箱上注册了 1000 个具有不同持久性 ID [0-1000] 的事件。

使得期刊喜欢:

订购 persistence_id event_ser_manifest
1 200 冰箱打开
2 200 冰箱关闭
... ... ...
500 500 冰箱打开
... ... ...
1000 501 冰箱关闭

如果有一条持久性 ID 为 500 的

GetCoffeeMachineState
消息传入
frontDoorEntity
actor。
frontDoorEntity
将尝试重播
persistence_id = 500
日志事件。 它会失败,因为它无法将
OfficeFridgeEvent
投射到
OfficeCoffeeMachineEvent
actor(akka 输入还记得吗?)。

这是此类系统的常见设置吗?或者每个实体是否都需要自己的数据库,该数据库具有仅包含参与者接受的“有效”类型事件的事件日志?

我现在在我的系统中看到了这个确切的问题。如果有人(意外地)运行 1000 个这样的查询,我将有 1000 个实体参与者尝试永远重播这些事件,或者直到我重新启动 Pod。

我最终得到的是无限次尝试使用以下堆栈跟踪重新启动实体参与者

 at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:200) 
 at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:98) 
 at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:73)
Caused by: java.lang.ClassCastException

这是有道理的,因为类型化的参与者正在尝试处理不同类型的事件。

scala akka event-sourcing akka-persistence
1个回答
0
投票

您说了多个实体(在您的示例中为

fridge
front door
coffe machine
)。每个实体回复不同的命令坚持不同的事件

当您使用

EventSourcedBehavior.apply() 创建 
EventSourcedBehavior

  /**
   * Create a `Behavior` for a persistent actor.
   *
   * @param persistenceId stable unique identifier for the event sourced behavior
   * @param emptyState the intial state for the entity before any events have been processed
   * @param commandHandler map commands to effects e.g. persisting events, replying to commands
   * @param eventHandler compute the new state given the current state when an event has been persisted
   */
  def apply[Command, Event, State](
      persistenceId: PersistenceId,
      emptyState: State,
      commandHandler: (State, Command) => Effect[Event, State],
      eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State] = {
    val loggerClass = LoggerClass.detectLoggerClassFromStack(classOf[EventSourcedBehavior[_, _, _]], logPrefixSkipList)
    EventSourcedBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler, loggerClass)
  }

第一个参数是PersistenceId。您负责使该 ID 独一无二。该对象提供了一个 factory 方法,要求您提供提示和实体 ID

  /**
   * Constructs a [[PersistenceId]] from the given `entityTypeHint` and `entityId` by
   * concatenating them with `|` separator.
   *
   * Cluster Sharding is often used together with `EventSourcedBehavior` for the entities.
   * The `PersistenceId` of the `EventSourcedBehavior` can typically be constructed with:
   * {{{
   * PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
   * }}}
   *
   * That format of the `PersistenceId` is not mandatory and only provided as a convenience of
   * a "standardized" format.
   *
   * Another separator can be defined by using the `apply` that takes a `separator` parameter.
   *
   * The `|` separator is also used in Lagom's `scaladsl.PersistentEntity` but no separator is used
   * in Lagom's `javadsl.PersistentEntity`. For compatibility with Lagom's `javadsl.PersistentEntity`
   * you should use `""` as the separator.
   *
   * @throws IllegalArgumentException if the `entityTypeHint` or `entityId` contains `|`
   */
  def apply(entityTypeHint: String, entityId: String): PersistenceId

正如事件溯源 - PersistenceId

中详细介绍的那样

持久化 ID

PersistenceId是后端事件日志和快照存储中持久参与者的稳定唯一标识符。

集群分片通常与

EventSourcedBehavior
一起使用,以确保每个
PersistenceId
entityId
)只有一个活动实体。有一些技术可以确保这种唯一性,可以在集群分片文档中的持久性示例中找到示例。这说明了如何从
EntityContext
提供的
PersistenceId
entityTypeKey
构造 entityId

Cluster Sharding中的

entityId
是实体的业务域标识。
entityId
可能不够独特,无法单独用作
PersistenceId
。例如,两种不同类型的实体可能具有相同的
entityId
。要创建唯一的
PersistenceId
entityId
应以实体类型的稳定名称作为前缀,该名称通常与集群分片中使用的
EntityTypeKey.name
相同。有
PersistenceId.apply
工厂方法可以帮助从
PersistenceId
entityTypeHint
构造这样的
entityId

您可以以 akka 持久化购物车示例 - ShoppingCart Behaviour 作为一个很好的例子

  def apply(cartId: String): Behavior[Command] = {
    EventSourcedBehavior[Command, Event, State](
      PersistenceId("ShoppingCart", cartId),
      State.empty,
      (state, command) => ...,
      (state, event) => ...
    )
  }

你的代码应该是这样的

val fridgeEntity = 
  EventSourcedBehavior[OfficeFridgeCommand, OfficeFridgeEvent, OfficeFridge](
  PersistenceId("Fridge"), UUID.randomUUID(),
  Fridge.emptyState,
  Fridge.commandHandler,
  Fridge.eventHandler
)

val frontDoorEntity = 
  EventSourcedBehavior[OfficeFrontDoorCommand,, OfficeFrontDoorEvent, OfficeFridge](
  PersistenceId("FrontDoor"), UUID.randomUUID(),
  FrontDoor.emptyState,
  FrontDoor.commandHandler,
  FrontDoor.eventHandler
)

val coffeeMachineEntity = 
  EventSourcedBehavior[OfficeCoffeeMachineCommand, OfficeCoffeeMachineEvent, OfficeFridge](
  PersistenceId("CoffeeMachine"), UUID.randomUUID(),
  CoffeeMachine.emptyState,
  CoffeeMachine.commandHandler,
  CoffeeMachine.eventHandler
)

将实体的事件持久保存在日志中后,您将能够在数据库中看到类似以下内容

订购 persistence_id event_ser_manifest
1 冰箱| 冰箱打开
2 冰箱| 冰箱关闭
... ... ...
1 咖啡机| 咖啡制作
2 咖啡机| 咖啡制作
... ... ...
1 前门| 前门打开
2 前门| 前门关闭

我需要将所有内容都放在同一个数据库中吗?我应该使用关系型数据库还是非关系型数据库?

一如既往,这取决于情况。如果您需要每秒保留数百万个事件,并且这些事件来自数百万个不同的设备,那么答案是否定的。 Akka 持久化提供不同的持久化插件

每个都有自己的优点和缺点。

您可以在 scala index - akka persistence 找到更多插件,但它们可能已经过时,没有商业支持等。

© www.soinside.com 2019 - 2024. All rights reserved.