是否可以缓存源并重用它而不触发“子流不能多次实现”?
我正在进行流连接,需要调用左边每个元素的微服务。该调用返回要加入的记录流。我想缓存源,以便对缓存流中的微服务进行相同的调用。但我正在做的flatMapConcat抛出“Substream无法实现多次”错误。代码如下所示:
val cache = new util.HashMap[AnyRef, Source[Array[AnyRef], Any]]()
inputSource
.flatMapConcat { record =>
val key = leftKey(record)
val rightElemSource = if (cache.containsKey(key)) {
cache.get(key)
} else {
val rightElemSourceInner = doSomethingToGetSource()
cache.put(key, rightElemSourceInner)
rightElemSourceInner
}
rightElemSource.map(join(record, _))
}
Source
代表了潜在的巨大甚至无限的数据流。它被设计为仅像Iterator
一样遍历。如果您真的希望重用源代码的内容,则必须将其收集到常规数据结构中,例如Seq
。所以你的缓存是util.HashMap[AnyRef, Seq[Array[AnyRef], Any]]
类型。