缓存一个Akka子流源并重用它?

问题描述 投票:0回答:1

是否可以缓存源并重用它而不触发“子流不能多次实现”?

我正在进行流连接,需要调用左边每个元素的微服务。该调用返回要加入的记录流。我想缓存源,以便对缓存流中的微服务进行相同的调用。但我正在做的flatMapConcat抛出“Substream无法实现多次”错误。代码如下所示:

    val cache = new util.HashMap[AnyRef, Source[Array[AnyRef], Any]]()

    inputSource
        .flatMapConcat { record =>
          val key = leftKey(record)
          val rightElemSource = if (cache.containsKey(key)) {
            cache.get(key)
          } else {
            val rightElemSourceInner = doSomethingToGetSource()
            cache.put(key, rightElemSourceInner)
            rightElemSourceInner
          }

          rightElemSource.map(join(record, _))
        }
scala akka akka-stream
1个回答
1
投票

Source代表了潜在的巨大甚至无限的数据流。它被设计为仅像Iterator一样遍历。如果您真的希望重用源代码的内容,则必须将其收集到常规数据结构中,例如Seq。所以你的缓存是util.HashMap[AnyRef, Seq[Array[AnyRef], Any]]类型。

© www.soinside.com 2019 - 2024. All rights reserved.