Spring Reactor:Optional的对应类是什么 ?

问题描述 投票:0回答:2

所以我有一个Flux<Foo>,我想将每个Foo映射到Baz。问题是,getBaz(Foo foo)可能会抛出一个IOException

因此我想到了Mono<Baz> getBazRx(Foo foo)方法,如果出现异常,它将返回Mono.just(baz)Mono.empty()

然后将最终与Flux<Mono<Baz>>哪种提醒Optional<T>容器。

这是在Spring Reactor中这样做的吗?如何正确食用?

java spring reactive-programming project-reactor reactor
2个回答
2
投票

在反应流中,“选项”通常通过从流中移除缺少的元素来处理(例如,空的Mono,或者丢弃元素的Flux),而不是使用Flux<Optional>Mono<Optional>Flux<Mono>

调用同步getBaz方法时,可以使用单个.handle操作,如下所示:

flux
    .handle((foo, sink) -> {
        try {
            // propagate Baz down the stream
            sink.next(getBaz(foo));
        } catch (IOException e) {
            // Since sink.next is not called here,
            // the problematic element will be dropped from the stream
            log.error(e);
        }
    })

当调用异步getBazRx方法(返回Mono)时,你可以在onErrorResume / flatMap / flatMapSequential中使用concatMap,如下所示:

flux
    .flatMap(foo -> getBazRx(foo)
        .onErrorResume(t -> {
            log.error(t);
            return Mono.empty();
        }))

(或者你可以在.onErrorResume中移动.getBazRx,具体取决于你想要捕获的位置并忽略异常)

此外,既然你在你的问题中提到它...如果你要创建包装getBazRxgetBaz,如果getBaz有可能阻止你,你就不应该做这样的事情:

Mono<Baz> getBazRx(Foo foo) {
    // BAD!!!
    try {
        return Mono.just(getBaz(foo));
    } catch (IOException e) {
        return Mono.error(e)  // or Mono.empty() if you want to ignore
    }
}

该实现实际上只是模拟异步方法的同步方法。它有两个问题:

  1. 工作立即完成,而不是在订阅返回的Mono之后
  2. 如果getBaz阻止,你可能最终阻止事件循环

相反,你应该推迟工作,直到订阅mono,并在用于阻塞操作的Scheduler上运行任何阻塞操作,如下所示:

Mono<Baz> getBazRx(Foo foo) {
    return Mono.fromSupplier(() -> {
            try {
                return getBaz(foo);
            } catch (IOException e) {
                throw Exceptions.propagate(e);  // or return null to ignore and complete empty
            }
        })
        .subscribeOn(Schedulers.elastic());  // run on a scheduler suitable for blocking work
}

0
投票

由于您想跳过错误(例如只记录它),您可以使用onErrorContinue。此外,由于getBaz抛出一个检查异常,我们需要捕捉它和return(而不是扔)一个RuntimeException而不是。 Reactor有一个实用方法来做这个Exceptions.propagate

flux
  .map(foo -> {
      try {
        return getBaz(foo);
      } catch (IOException e) {
        return Exceptions.propagate(e);
      }
  })
  .onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
  .subscribe(baz -> log.info("Read value {}", baz));
© www.soinside.com 2019 - 2024. All rights reserved.