从Lagom / Akka Kafka为Websocket主题订阅者创建源代码

问题描述 投票:0回答:1

我希望我的仅限Lagom订阅者服务订阅Kafka主题并将消息流式传输到websocket。我使用此文档(https://www.lagomframework.com/documentation/1.4.x/scala/MessageBrokerApi.html#Subscribe-to-a-topic)作为指导,定义如下服务:

    // service call
    def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

    // service implementation
    override def stream() = ServiceCall { req =>
      req.runForeach(str => log.info(s"client: %str"))
      kafkaTopic().subscribe.atLeastOnce(Flow.fromFunction(
        // add message to a Source and return Done
      ))
      Future.successful(//some Source[String, NotUsed])

但是,我无法弄清楚如何处理我的kafka消息。 Flow.fromFunction返回[String, Done, _]并暗示我需要将这些消息(字符串)添加到已在订阅者之外创建的Source。

所以我的问题有两个:1)如何创建一个akka流源来在运行时从kafka主题订阅者接收消息? 2)如何在Flow中将kafka消息附加到所述源?

scala websocket apache-kafka akka-stream lagom
1个回答
1
投票

您似乎误解了Lagom的服务API。如果您尝试从服务呼叫的主体中实现流,则您的呼叫没有输入;即,

def stream(): ServiceCall[Source[String, NotUsed], Source[String, NotUsed]]

意味着当客户提供Source[String, NotUsed]时,该服务将以实物形式作出回应。您的客户不直接提供此服务;因此,您的签名应该是

def stream(): ServiceCall[NotUsed, Source[String, NotUsed]]

现在问你的问题......

这实际上并不存在于scala giter8模板中,但java版本包含他们称之为autonomous stream的内容,它大致可以执行您想要执行的操作。

在Scala中,此代码看起来像......

override def autonomousStream(): ServiceCall[
  Source[String, NotUsed], 
  Source[String, NotUsed]
] = ServiceCall { hellos => Future {
    hellos.mapAsync(8, ...)
  }
}

由于您的调用不是映射输入流,而是映射kafka主题,因此您需要执行以下操作:

override def stream(): ServiceCall[NotUsed, Source[String, NotUsed]] = ServiceCall { 
  _ => 
    Future {
      kafkaTopic()
        .subscribe
        .atMostOnce
        .mapAsync(...)
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.