Scala、ZIO、ZStream - 如何将自定义数据对象流式传输到端点?

问题描述 投票:0回答:0

我想以一定的重复时间从

Zstream
流式传输数据。我有我的主要功能,它返回
ZIO

def processData(request: MyRequest): Task[Seq[SomePayload]]

我也为列表中的每个元素调用此方法

requests
.

现在我想获取此输出并流式传输我在

Seq[SomePayload]
中的 json。我创建了一个带有端点的流:

  val streamingServerEndpoint: ZServerEndpoint[Any, ZioStreams] = streamingEndpoint.zServerLogic { _ =>
    val stream  =
      ZStream.fromZIO(ZIO.collectAll(requests.collect(service.processData(_))).repeat(Schedule.minuteOfHour(30))).map(_.toByte)

    ZIO.succeed((100L, stream))
  }

端点:

  val streamingEndpoint: PublicEndpoint[Unit, Unit, (Long, Stream[Throwable, Byte]), ZioStreams] =
    endpoint.get
      .in("receive")
      .out(header[Long](HeaderNames.ContentLength))
      .out(streamTextBody(ZioStreams)(CodecFormat.TextPlain(), Some(StandardCharsets.UTF_8)))

但是当我运行这段代码时,我可以向

/receive
端点发出请求,但输出中没有任何流式传输。在日志中,我看到
processData
方法有效并且它返回数据,但端点上没有任何内容。我应该如何更改此代码以在 API 上流式传输 json(或作为字节的 json)? 我还想每小时为所有请求运行
processData
(每小时 30 分钟)。

scala stream streaming zio
© www.soinside.com 2019 - 2024. All rights reserved.