使用Flux连续减少状态

问题描述 投票:0回答:1

假设我有两种事件类型(AB)和Fluxes以某种方式生成它们:

Flux<A> aFlux = ...;
Flux<B> bFlux = ...;

还有一个类型,它保存当前状态由S类型表示:

class S {
  final int val;
}

我想创建以下内容:

final S sInitial = ...;

Flux<S> sFlux = Flux.merge(aFlux, bFlux)
  .scan((a, e) -> {
    if(e instanceof A) {
      return mapA(a, (A)e);
    } else if(e instanceof B) {
      return mapB(a, (B)e);
    } else {
      throw new RuntimeException("invalid event");
    }
  })
  .startWith(sInitial);

其中sCurr是qFxswpoi的实例,最后由sFlux输出,从S开始,sInitial / mapA返回mapB类型的新值。 SS都是不变的。

也就是说,我想:

  • 连续输出最新状态......
  • ......正在生成......
  • ...基于当前状态和收到的事件......
  • ...按照映射器函数的规定

有没有办法以其他方式重组上面的流流,特别是为了避免使用sInitial

java project-reactor
1个回答
0
投票

您可以添加界面并为A和B类实现它

 aFlux = ...; Flux bFlux = ...; and also a type that holds the current state denoted by type S:...

现在你可以使用interface ToSConvertible { S toS(S s); } 方法:

reactor.core.publisher.Flux#scan(A, java.util.function.BiFunction<A,? super T,A>)
© www.soinside.com 2019 - 2024. All rights reserved.