我有一个
Flux
元素,我想记录一条消息,以防它不包含任何元素。我尝试了这个,但它没有按预期工作。该异常没有被我的消费者“捕获”,而是由 Reactor 的默认处理程序处理
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class GenericTest {
@Test
@SneakyThrows
void test() {
Flux.empty()
.switchIfEmpty(Mono.error(new IllegalStateException()))
.onErrorContinue((t, o) -> log.warn("Flux is empty"))
.subscribe();
Thread.sleep(5000);
}
}
ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
Caused by: java.lang.IllegalStateException: null
at com.example.dynamicgateway.misc.GenericTest.test(GenericTest.java:14)
我如何实现我的目标?
您可以使用
Mono.defer()
“捕获”空通量,记录,然后让它继续移动
@Test
@SneakyThrows
void test() {
Flux.empty()
.switchIfEmpty(Flux.defer(() -> {
log.warn("Flux is empty");
return Flux.empty();
}))
.subscribe(e -> {});
Thread.sleep(2000);
}
或者你可以切换到错误
Publisher
,然后让订阅者处理它。如果您链接订阅者,它可以记录错误消息
@Test
@SneakyThrows
void test() {
Flux.empty()
.switchIfEmpty(Flux.defer(() -> Flux.error(new IllegalStateException("Flux empty"))))
.subscribe(e -> {}, t -> log.warn(t.getMessage()));
Thread.sleep(2000);
}