Project Reactor的Flux在处理错误期间失败

问题描述 投票:1回答:1

我需要处理Flux流错误,为此我需要知道究竟什么项失败。似乎方法doOnError应该适合处理错误,但是我只能获得异常而不是失败的项目。有没有办法让失败的项目和例外?

private void testFluxIterableFlow() {
    Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
            .map(this::process)
            .doOnError(ex -> {
                ...
            })
            .doOnNext(processedValue -> ...)
            .subscribe();
}

private String process(Integer value) {
    if (value == 4) {
        throw new RuntimeException("error...");
    }
    return "processed " + value;
}

在这个例子中,我需要在错误处理程序中接收失败的项目4和异常消息。

java reactive-programming project-reactor
1个回答
1
投票

如果值导致异常,则应将该值视为异常原因。

由您来决定是否值得添加新的异常类型,

@RequiredArgsConstructor
public class WrongValueInStreamException extends RuntimeException {
    @Getter
    private final Object wrongValue;
}

public class StreamProcessor {
    public String process(Integer value) {
        if (value == 4) {
            throw new WrongValueInStreamException(4);
        }
        return "processed " + value;
    }
}

但只要例外传达了有用和相关的信息,这是一个很好的做法:

.doOnError(WrongValueInStreamException.class::isInstance, e -> {
    final Object value = ((WrongValueInStreamException) e).getWrongValue();
    // use 'value'
})
© www.soinside.com 2019 - 2024. All rights reserved.