DirectProcessor和UnicastProcessor可以在不应该订阅上游Publisher的情况下进行订阅。为什么?

问题描述 投票:1回答:1

根据有关处理器的Project Reactor文档:

直接(DirectProcessor和UnicastProcessor):这些处理器可以仅通过直接用户操作来推送数据(称其接收者为方法直接)。

同步(EmitterProcessor和ReplayProcessor):这些处理器可以通过用户操作和订阅数据来推送数据上游Publisher,并同步耗尽它。

UnicastProcessor应该无法订阅上游Publisher。那里的文档提供了直接用户Sink调用的示例:

UnicastProcessor<String> hotSource = UnicastProcessor.create();

Flux<String> hotFlux = hotSource.publish()
                                .autoConnect()
                                .map(String::toUpperCase);

hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));

hotSource.onNext("blue");

但是我尝试直接将UnicastProcessor订阅到Publisher,并且它可以正常工作。如文档中所述,这应该是不可能的。我错过了什么东西吗?

在以下示例中,我毫无问题地将UnicastProcessor订阅到上游Flux

    val latch = CountDownLatch(20)

    val numberGenerator: Flux<Long> = counter(1000)
    val processor = UnicastProcessor.create<Long>()

    val connectableFlux = numberGenerator.subscribeWith(processor)

    connectableFlux.subscribe {
        logger.info("Element [{}]", it)
    }

    latch.await()

日志:

12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]
spring reactive-programming project-reactor
1个回答
1
投票

是,文档的这方面似乎已过时。甚至DirectProcessor都可以用作Subscriber,并将信号传播到其自己的用户。

© www.soinside.com 2019 - 2024. All rights reserved.