根据有关处理器的Project Reactor文档:
直接(DirectProcessor和UnicastProcessor):这些处理器可以仅通过直接用户操作来推送数据(称其接收者为方法直接)。
同步(EmitterProcessor和ReplayProcessor):这些处理器可以通过用户操作和订阅数据来推送数据上游Publisher,并同步耗尽它。
UnicastProcessor
应该无法订阅上游Publisher
。那里的文档提供了直接用户Sink调用的示例:
UnicastProcessor<String> hotSource = UnicastProcessor.create();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: "+d));
hotSource.onNext("blue");
但是我尝试直接将UnicastProcessor
订阅到Publisher
,并且它可以正常工作。如文档中所述,这应该是不可能的。我错过了什么东西吗?
在以下示例中,我毫无问题地将UnicastProcessor
订阅到上游Flux
:
val latch = CountDownLatch(20)
val numberGenerator: Flux<Long> = counter(1000)
val processor = UnicastProcessor.create<Long>()
val connectableFlux = numberGenerator.subscribeWith(processor)
connectableFlux.subscribe {
logger.info("Element [{}]", it)
}
latch.await()
日志:
12:50:12.193 [main] INFO reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
12:50:12.196 [main] INFO reactor.Flux.Map.1 - request(unbounded)
12:50:13.203 [parallel-1] INFO reactor.Flux.Map.1 - onNext(0)
12:50:13.203 [parallel-1] INFO com.codependent.Test - Element [0]
是,文档的这方面似乎已过时。甚至DirectProcessor
都可以用作Subscriber
,并将信号传播到其自己的用户。