如果 Flux 为空,如何传递 Consumer/Runnable?

问题描述 投票:0回答:1

我有一个

Flux
元素,我想记录一条消息,以防它不包含任何元素。我尝试了这个,但它没有按预期工作。该异常没有被我的消费者“捕获”,而是由 Reactor 的默认处理程序处理

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class GenericTest {
    @Test
    @SneakyThrows
    void test() {
        Flux.empty()
                .switchIfEmpty(Mono.error(new IllegalStateException()))
                .onErrorContinue((t, o) -> log.warn("Flux is empty"))
                .subscribe();
        Thread.sleep(5000);
    }
}
ERROR reactor.core.publisher.Operators -- Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException
Caused by: java.lang.IllegalStateException: null
    at com.example.dynamicgateway.misc.GenericTest.test(GenericTest.java:14)

我如何实现我的目标?

java project-reactor
1个回答
0
投票

您可以使用

Mono.defer()
“捕获”空通量,记录,然后让它继续移动

    @Test
    @SneakyThrows
    void test() {
        Flux.empty()
                .switchIfEmpty(Flux.defer(() -> {
                    log.warn("Flux is empty");
                    return Flux.empty();
                }))
                .subscribe(e -> {});
        Thread.sleep(2000);
    }

或者你可以切换到错误

Publisher
,然后让订阅者处理它。如果链接订阅者,它可以记录错误消息

    @Test
    @SneakyThrows
    void test() {
        Flux.empty()
                .switchIfEmpty(Flux.defer(() -> Flux.error(new IllegalStateException("Flux empty"))))
                .subscribe(e -> {}, t -> log.warn(t.getMessage()));
        Thread.sleep(2000);
    }
© www.soinside.com 2019 - 2024. All rights reserved.