无法在Akka Stream中使用GraphStage类运行SourceShape

问题描述 投票:2回答:1

我正在尝试使用GraphStage构造创建Redis Akka流源。这个想法是,每当我从subscription方法获得更新时,我都会将其推送到下一个组件。同样,如果没有拉动信号,则组件应背压。这是代码:

class SubscriberSourceShape(channel: String, subscriber: Subscriber) 
    extends GraphStage[SourceShape[String]] {

  private val outlet: Outlet[String] = Outlet("SubscriberSource.Out")

  override def shape: SourceShape[String] = SourceShape(outlet)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
    new GraphStageLogic(shape) {
      val callback = getAsyncCallback((msg: String) => push(outlet, msg)
      val handler = (msg: String) => callback.invoke(msg)

      override def preStart(): Unit = subscriber.subscribe(channel)(handler)
    }
  }
}

但是,当我用一个简单的接收器运行它时,出现此错误:

Error in stage [akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@58344854]: No handler defined in stage [com.craysoft.quote.api.stream.SubscriberSourceShape@6a9193dd] for out port [RedisSubscriberSource.Out(1414886352). All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.

怎么了?

scala akka akka-stream
1个回答
1
投票

您收到此错误是因为您没有为Source设置任何输出处理程序,因此,当下游组件(Flow或Sink)向该Source发送拉动信号时,将没有处理该拉动信号的处理程序。

您可以添加OutputHandler来消除该错误。将onPull方法留空,因为您要在asyncCallback上生成元素。只需将其添加到GraphStageLogic主体中即可:

      setHandler(outlet, new OutHandler {
        override def onPull(): Unit = {}
      })
© www.soinside.com 2019 - 2024. All rights reserved.