动态发布者集合都通过相同的流量发出

问题描述 投票:1回答:1

我正在尝试构建一种可以通过热通量(输出)发出的集线器服务,但是您也可以注册/取消注册通量生产者/发布者(输入)

我知道我可以做类似的事情:

    class Hub<T> {
        /**
         * @return unregister function
         */
        Function<Void, Void> registerProducer(final Flux<T> flux) { ... }

        Disposable subscribe(Consumer<? super T> consumer) {
            if (out == null) { 
                // obviously this will not work!
                out = Flux.merge(producer1, producer2, ...).share();
            }
            return out;
        }
    }

...但是,由于这些“生产者”已注册和未注册,我如何为现有的已订阅通量添加新的通量源?或从中删除未注册的来源?

TIA!

spring-webflux project-reactor reactor
1个回答
0
投票

[Flux在设计上是不可变的,因此正如您所暗示的那样,没有办法只是就地“更新”现有的Flux

通常,我建议避免直接使用Processor。但是,这是[罕见的]情况之一,其中Processor可能是唯一的明智选择,因为您本质上想根据要注册的生产者动态发布元素。类似于:

class Hub<T> {

    private final FluxProcessor<T, T> processor;
    private final FluxSink<T> sink;

    public Hub() {
        this.processor = DirectProcessor.<T>create().serialize();
        this.sink = processor.sink();
    }

    public Disposable registerProducer(Flux<T> flux) {
        return flux.subscribe(sink::next);
    }

    public Flux<T> read() {
        return processor;
    }
}

如果要删除生产者,则可以跟踪从Disposable返回的registerProducer(),并在完成后对其调用dispose()

© www.soinside.com 2019 - 2024. All rights reserved.