在Redis pub / sub和Akka Streams中使用SSE的最简单方法是什么?

问题描述 投票:1回答:2

我想为以下场景流式传输分块服务器发送事件:

订阅Redis密钥,如果密钥更改,则使用Akka Streams流式传输新值。它应该只在有新值时才流。

据我了解,我需要一个Source。我猜这是订阅频道:

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) => println(
    message.readAs[String]()
  )
  case PubSubMessage.Subscribe(channel, subscribedChannelsCount) => println(
    s"Successfully subscribed to $channel"
  )
}

在我的路线中,我需要从中创建一个Source,但说实话,我不知道如何开始:

val route: Route =
  path("stream") {
   get {
     complete {
       val source: Source[ServerSentEvent, NotUsed] =
         Source
          .asSubscriber(??) // or fromPublisher???
      .map(_ => {
        ??
      })
      .map(toServerSentEvent)
      .keepAlive(1.second, () => ServerSentEvent.heartbeat)
      .log("stream")
     }
   }
scala redis akka akka-stream akka-http
2个回答
1
投票

一种方法是使用Source.actorRefBroadcastHub.sink

val (sseActor, sseSource) =
  Source.actorRef[String](10, akka.stream.OverflowStrategy.dropTail)
    .map(toServerSentEvent) // converts a String to a ServerSentEvent
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
    .run()

将物化的ActorRef订阅到您的消息通道:发送给此actor的消息将向下游发出。如果没有下游需求,则使用指定的溢出策略将消息缓冲到一定数量(在此示例中,缓冲区大小为10)。请注意,此方法没有背压。

redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) =>
    val strMsg = message.readAs[String]
    println(strMsg)
    sseActor ! strMsg

  case ...
}

另请注意,上面的示例使用了Source.actorRef[String];根据需要调整类型和示例(例如,它可能是Source.actorRef[PubSubMessage.Message])。

你可以在你的路径中使用物化的Source

path("stream") {
  get {
    complete(sseSource)
  }
}

1
投票

另一种方法可以是创建Source作为队列,并将该元素提供给在订阅者回调中接收的队列

val queue =
  Source
    .queue[String](10, OverflowStrategy.dropHead) // drops the oldest element from the buffer to make space for the new element.
    .map(toServerSentEvent) // converts a String to a ServerSentEvent
    .keepAlive(1.second, () => ServerSentEvent.heartbeat)
    .to(Sink.ignore)
    .run()

在订户中

    redis.subscriber.subscribe("My Channel") {
  case message @ PubSubMessage.Message(channel, messageBytes) =>
    val strMsg = message.readAs[String]
    println(strMsg)
    queue.offer(strMsg)

  case ...
}
© www.soinside.com 2019 - 2024. All rights reserved.