我来自ReactiveX,我们有运营商推迟,以便创建一个Observable
并在我们有订户后获得排放值。
在Akka Streams,我想知道是否存在类似的东西:
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source(range)
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
拥有此代码,甚至在我们执行run()之前,更改范围的值,由于蓝图已经创建,因此值不会更改,并且会发出0到10。
有没有像Akka Streams中的Observable.defer
?
解:
我找到了解决方案,解决方案是使用lazy关键字,我们提供了一个在运行流后执行的功能。
我将保留这个问题,以防万一有更好的方法或其他人有同样的问题
@Test def defer(): Unit = {
var range = 0 to 10
val graphs = Source.lazily(() => Source(range))
.to(Sink.foreach(println))
range = 10 to 20
graphs.run()
Thread.sleep(2000)
}
问候。
最简单的方法可能是Source.fromIterator(() => List(1).iterator)
或类似的东西。在Akka Streams API中,我们选择尝试保留最小的操作符集,因此有时您可能会遇到单行中可以实现相同的情况,但是没有直接对应的名称就像在这里推迟的情况一样。如果您认为这是一个很常见的事情,请通过github.com/akka/akka告诉我们,我们可以考虑将其添加为API。
请注意,还有fromFuture
和其他的,虽然没有直接相关可能是有用的,这取决于您的实际用例(尤其是与Promise结合使用时)。