如何创建一个Flowable,在其最后一个订阅者取消其订阅后立即完成

问题描述 投票:0回答:1

我有一个案例,我需要建立一个由多个元素组成的反应流,可以用两种不同的方式完成。

我看到的链条看起来有点像这样:

在链的顶端,我有多个Flowable生成器,每个生成器定期轮询不同的远程资源,并在获得新结果时发出消息。

这些生产者中的每一个都有一个中间用户,可以进一步转换消息。

反过来,所有这些中间订阅者都拥有相同的root订阅者,将每个人的转换事件合并为一个Flowable

我想要的是能够取消root用户的任何订阅,并且该取消订阅事件基本上导致un-subscribee(是一个单词?)来完成。

换句话说,如果没有人在听,我希望我的投票过程停止。如果我需要重新启动轮询,我只需重建一个新的链。

是否有任何RxJava运算符允许我在生产者和订阅者之间创建这种关系?如果没有,最简单的方法是什么?

我现在有一种解决方案,但它不是一个非常好的解决方案,并且需要链中每个元素的大量样板代码。它看起来有点像:

private class AutoUnsubscribing {
    private final AtomicReference<Subscription> upstreamSubscription = new AtomicReference<>(null);
    private final PublishProcessor<LedgerEvent> publisher = PublishProcessor.create();

    public Flowable<LedgerEvent> getFlowable(Flowable<LedgerEvent> upstream) {
        upstream.subscribe(new Subscriber<LedgerEvent>() {
            @Override
            public void onSubscribe(Subscription s) {
                upstreamSubscription.set(s);
            }

            @Override
            public void onNext(LedgerEvent ledgerEvent) {
                publisher.onNext(ledgerEvent);
            }

            @Override
            public void onError(Throwable t) {
                publisher.onError(t);
            }

            @Override
            public void onComplete() {
                publisher.onComplete();
            }
        });

        return publisher.doOnCancel(() -> {
            if (!publisher.hasSubscribers()) {
                // no more subscribers. unsubscribe from upstream and complete this stream
                upstreamSubscription.get().cancel();
                upstreamSubscription.set(null);
                publisher.onComplete();
            }
        });
    }
}

但必须有一个比这更好的解决方案。

java reactive-programming rx-java2
1个回答
0
投票

现在我已经获得了更多RxJava的经验,我能够为我的问题制定一个答案。基本上,我们拥有的链条如下:

+----------+      +-------------+      +----------+
| consumer |←-----| transformer |←-----| producer |
+----------+      +-------------+      +----------+

真正需要的是一种让transformer可以取消的方法,当它被取消时,它对producer的订阅被取消(所以producer可以在终止之前做一些清理),并且transformer完成了自己(这样consumer将成为意识到这一点)。

我能找到的最佳解决方案是实现detachable运算符,如下所示:

public static <T> FlowableTransformer<T, T> detachable(
            Consumer<Disposable> disposableConsumer) {
    PublishProcessor<Object> stopper = PublishProcessor.create();
    Disposable disposable = new Disposable() {
        private AtomicBoolean disposed = new AtomicBoolean(false);

        @Override
        public void dispose() {
            if (disposed.compareAndSet(false, true)) {
                stopper.onNext(new Object());
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed.get();
        }
    };

    return upstream -> upstream.takeUntil(stopper)
            .doOnSubscribe(s -> disposableConsumer.accept(disposable));

}

然后可以这样使用:

AtomicReference<Disposable> transformerDisposable = new AtomicReference<>();

producer
    .compose(detachable(transformerDisposable::set))
    .subscriber(consumer)

然后transfomer可以随意取消使用

transformerDisposable.dispose()

这确实会让producer称之为自己的onUnsubscribe,而consumer则会收到onComplete通知。

© www.soinside.com 2019 - 2024. All rights reserved.