如何从Akka事件流构建Akka流源?

问题描述 投票:0回答:2

[MyActor收到Start消息时,它将运行Akka Stream,并将收到的每个项目发布到Akka Event Stream

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

现在在另一个代码块中,我想从该事件流中的那些项目中构建一个Source,因此可以在另一个Akka Stream中处理每个发布的项目。

我该怎么做?

如果可能会添加更多选项,请注意,所讨论的另一代码块是Play framework的Websocket处理程序。

scala akka akka-stream
2个回答
1
投票

这似乎是XY problem。如果发布者和订阅者最终脱钩,那么如果发布者生成数据的速度比订阅者快,该怎么办?

话虽如此,这是一种您所要求的方式:

/** Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)


0
投票

我终于让它与BroadcastHub一起使用,不再有Akka事件流。

我的发布者(本身正在使用来源)看起来像这样:

val publisherSource = someSource
  .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
  .run()

然后在另一个代码块中,我只需要引用PublisherSource:

val subscriberSource = publisherSource
  .map(...) // What ever

您可以根据需要拥有任意数量的SubscriberSource,他们都将收到相同的项目。

© www.soinside.com 2019 - 2024. All rights reserved.