我正在使用项目reactor探索反应流,并且我有一个用例,如果在处理当前事件期间发生错误(例如,反序列化错误),我需要跳到下一个事件。
对于这样的用例,我发现我可以使用
onErrorContinue
运算符,但我也考虑过用 try catch
包装我的回调,如下所示:
Flux.just(1, 2, 3, 4, 5).mapNotNull(item -> {
try {
System.out.println("item = " + item);
if (item.equals(3)) throw new IllegalArgumentException(); // deserialization issue for example
return item;
} catch (Exception exception){
System.out.println(exception);
return null;
}
}).subscribe(item -> System.out.println("got element :" + item));
在引入 onErrorContinue
运算符之前,我发现了这个旧的
问题,其中有人抱怨必须将回调包装在 try catch 中才能解决问题,如我在示例中所示。
所以我想知道除了坚持函数式反应式编程风格之外,是否还有理由更喜欢
onErrorContinue
而不是第二种方法?如果 try-catch 方法有问题?
在反应流中,当发出错误信号时,发布者被视为已终止:
终端状态:对于
:当Publisher
或onComplete
已发出信号时。对于onError
:收到Subscriber
或onComplete
时。onError
为了防止Reactor中的
Publisher
终止,您可以根据您的需求try/catch
或使用onErrorContinue
。
在我看来,
try/catch
没有任何问题,有时像下面的场景,这是最好的解决方案,因为onErrorContinue
会跳过所有错误事件,有时你会在同一个链中处理终端和非终端异常运算符,例如:
Flux.just(1, 2, 3, 4, 5, 6, 7)
.map(item -> {
System.out.println("item = " + item);
if (item.equals(5)) {
throw new IllegalArgumentException("Terminal event"); // terminal exception
}
return item;
})
.mapNotNull(item -> {
System.out.println("item = " + item);
if (item.equals(3)) {
throw new IllegalArgumentException("Non-terminal event"); // non-terminal exception
}
return item;
})
.onErrorContinue((error, item) -> {
System.out.println(error.getMessage());
}).subscribe(item -> System.out.println("got element :" + item));
在这种情况下,抛出的两个异常都是同一类型,但其中一个被视为终止事件。我认为这个问题的正确解决方案是在
try/catch
中使用 flatMap
而不是 onErrorContinue
:
Flux.just(1, 2, 3, 4, 5, 6, 7)
.map(item -> {
System.out.println("item = " + item);
if (item.equals(5)) {
throw new IllegalArgumentException("Terminal event"); // terminal exception
}
return item;
})
.flatMap(item -> {
System.out.println("item = " + item);
try {
if (item.equals(3)) {
throw new IllegalArgumentException("Non-terminal event"); // non-terminal exception
}
return Flux.just(item);
} catch (IllegalArgumentException e) {
return Flux.empty();
}
})
.subscribe(item -> System.out.println("got element :" + item));
这样,终止事件将结束
Publisher
,非终止事件将被处理,如果出现异常,则发出 Flux.empty()
。