所以我有一个Flux<Foo>
,我想将每个Foo
映射到Baz
。问题是,getBaz(Foo foo)
可能会抛出一个IOException
。
因此我想到了Mono<Baz> getBazRx(Foo foo)
方法,如果出现异常,它将返回Mono.just(baz)
或Mono.empty()
。
然后将最终与Flux<Mono<Baz>>
哪种提醒Optional<T>
容器。
这是在Spring Reactor中这样做的吗?如何正确食用?
在反应流中,“选项”通常通过从流中移除缺少的元素来处理(例如,空的Mono
,或者丢弃元素的Flux
),而不是使用Flux<Optional>
,Mono<Optional>
或Flux<Mono>
。
调用同步getBaz
方法时,可以使用单个.handle
操作,如下所示:
flux
.handle((foo, sink) -> {
try {
// propagate Baz down the stream
sink.next(getBaz(foo));
} catch (IOException e) {
// Since sink.next is not called here,
// the problematic element will be dropped from the stream
log.error(e);
}
})
当调用异步getBazRx
方法(返回Mono
)时,你可以在onErrorResume
/ flatMap
/ flatMapSequential
中使用concatMap
,如下所示:
flux
.flatMap(foo -> getBazRx(foo)
.onErrorResume(t -> {
log.error(t);
return Mono.empty();
}))
(或者你可以在.onErrorResume
中移动.getBazRx
,具体取决于你想要捕获的位置并忽略异常)
此外,既然你在你的问题中提到它...如果你要创建包装getBazRx
的getBaz
,如果getBaz
有可能阻止你,你就不应该做这样的事情:
Mono<Baz> getBazRx(Foo foo) {
// BAD!!!
try {
return Mono.just(getBaz(foo));
} catch (IOException e) {
return Mono.error(e) // or Mono.empty() if you want to ignore
}
}
该实现实际上只是模拟异步方法的同步方法。它有两个问题:
Mono
之后getBaz
阻止,你可能最终阻止事件循环相反,你应该推迟工作,直到订阅mono,并在用于阻塞操作的Scheduler
上运行任何阻塞操作,如下所示:
Mono<Baz> getBazRx(Foo foo) {
return Mono.fromSupplier(() -> {
try {
return getBaz(foo);
} catch (IOException e) {
throw Exceptions.propagate(e); // or return null to ignore and complete empty
}
})
.subscribeOn(Schedulers.elastic()); // run on a scheduler suitable for blocking work
}
由于您想跳过错误(例如只记录它),您可以使用onErrorContinue
。此外,由于getBaz
抛出一个检查异常,我们需要捕捉它和return
(而不是扔)一个RuntimeException
而不是。 Reactor有一个实用方法来做这个Exceptions.propagate
:
flux
.map(foo -> {
try {
return getBaz(foo);
} catch (IOException e) {
return Exceptions.propagate(e);
}
})
.onErrorContinue(RuntimeException.class, (t, b) -> log.error(t))
.subscribe(baz -> log.info("Read value {}", baz));