为什么 Flux.share() 不共享其订阅?

问题描述 投票:0回答:1

我想分享(即分割)我的通量,但

share()
似乎不会导致我的订阅被共享。为什么?

我有一个由昂贵的数据库调用发出的

Flux
。我想分割该通量并以不同的方式处理它产生的值(但not使用
groupBy()
运算符)。后来,我想再次组合不同的路径,这样我只需订阅一次(例如通过 REST 控制器):

    static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
        return Flux.generate( 
                       () -> {
                           System.out.println("subscribed"); // should only happen once
                           return 0;
                       },
                       (state, sink) -> {
                           sink.next(state);
                           return state + 1;
                       })
                   .map(String::valueOf)
                   .log()
                   .delayElements(Duration.ofSeconds(1))
                   .take(2)
                   .share() // share the flux so the DB is only queried once
            ;
    }

    static Flux<String> pathA() {
        // complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("a: " + it));
    }

    static Flux<String> pathB() {
        // different, equally complicated calculations
        return expensiveDatabaseCall().doOnNext(it -> System.out.println("b: " + it));
    }

    static Flux<String> controller() { // pretend this happens in a REST controller
        return Flux.merge(pathA(), pathB());
    }

    @Test
    void test() {
        StepVerifier.create(controller()).expectNextCount(4).verifyComplete();
    }

由于我使用的是

share()
运算符,我本以为只会看到一个订阅 - 但实际上我看到了两个
"subscribed"
。为什么?

share()
操作符不是应该订阅上游并自行处理所有下游订阅,而不是将它们传递回上游源吗?

这至少是我对docs的理解;他们是这么说的:

返回一个新的 Flux,它多播(共享)原始 Flux。 [...]

reactive-programming project-reactor flux multicasting
1个回答
0
投票

解释非常简单:任何缓存/共享行为都绑定到通量实例。这意味着当你这样做时:

public Flux<Integer> sharedCountdown() {
    return Flux.just(3, 2, 1, 0).share();
}

var instanceA = sharedCountdown();
var instanceB = sharedCountdown();

您创建两个不同的通量,每个通量都有自己的订阅和缓存。这与任何其他普通 Java 对象完全相同。

如果要共享,则必须在下游处理中使用相同的实例。

就你而言,你必须颠倒逻辑。 您的后处理器不应该自己调用/创建昂贵的通量,它们应该接收组装的实例作为输入。另外,如果您想确保所有下游处理器接收所有上游信号,您应该避免

share()
,而使用 publish().autoconnect(numberOfPostProcessors):

    static Flux<String> expensiveDatabaseCall() { // simulates the DB call and shares the result
        return Flux.generate( 
                       () -> {
                           System.out.println("subscribed"); // should only happen once
                           return 0;
                       },
                       (state, sink) -> {
                           sink.next(state);
                           return state + 1;
                       })
                   .map(String::valueOf)
                   .log()
                   .delayElements(Duration.ofSeconds(1))
                   .take(2)
            ;
    }

    static Flux<String> pathA(Flux<String> upstreamFlux) {
        // complicated calculations
        return upstreamFlux.doOnNext(it -> System.out.println("a: " + it));
    }

    static Flux<String> pathB(Flux<String> upstreamFlux) {
        // different, equally complicated calculations
        return upstreamFlux.doOnNext(it -> System.out.println("b: " + it));
    }

    static Flux<String> controller() { // pretend this happens in a REST controller
        var sharedExpensiveUpstream = expensiveDatabaseCall().publish().autoconnect(2);
        return Flux.merge(pathA(sharedExpensiveUpstream), pathB(sharedExpensiveUpstream));
    }
© www.soinside.com 2019 - 2024. All rights reserved.