在Akka Streams可观察到延迟

问题描述 投票:2回答:1

我来自ReactiveX,我们有运营商推迟,以便创建一个Observable并在我们有订户后获得排放值。

在Akka Streams,我想知道是否存在类似的东西:

 @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source(range)
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

拥有此代码,甚至在我们执行run()之前,更改范围的值,由于蓝图已经创建,因此值不会更改,并且会发出0到10。

有没有像Akka Streams中的Observable.defer

解:

我找到了解决方案,解决方案是使用lazy关键字,我们提供了一个在运行流后执行的功能。

我将保留这个问题,以防万一有更好的方法或其他人有同样的问题

  @Test def defer(): Unit = {
    var range = 0 to 10
    val graphs = Source.lazily(() => Source(range))
      .to(Sink.foreach(println))
    range = 10 to 20
    graphs.run()
    Thread.sleep(2000)
  }

问候。

akka rx-java observable akka-stream reactivex
1个回答
1
投票

最简单的方法可能是Source.fromIterator(() => List(1).iterator)或类似的东西。在Akka Streams API中,我们选择尝试保留最小的操作符集,因此有时您可能会遇到单行中可以实现相同的情况,但是没有直接对应的名称就像在这里推迟的情况一样。如果您认为这是一个很常见的事情,请通过github.com/akka/akka告诉我们,我们可以考虑将其添加为API。

请注意,还有fromFuture和其他的,虽然没有直接相关可能是有用的,这取决于您的实际用例(尤其是与Promise结合使用时)。

© www.soinside.com 2019 - 2024. All rights reserved.