我有一个案例,我需要建立一个由多个元素组成的反应流,可以用两种不同的方式完成。
我看到的链条看起来有点像这样:
在链的顶端,我有多个Flowable
生成器,每个生成器定期轮询不同的远程资源,并在获得新结果时发出消息。
这些生产者中的每一个都有一个中间用户,可以进一步转换消息。
反过来,所有这些中间订阅者都拥有相同的root订阅者,将每个人的转换事件合并为一个Flowable
我想要的是能够取消root用户的任何订阅,并且该取消订阅事件基本上导致un-subscribee(是一个单词?)来完成。
换句话说,如果没有人在听,我希望我的投票过程停止。如果我需要重新启动轮询,我只需重建一个新的链。
是否有任何RxJava运算符允许我在生产者和订阅者之间创建这种关系?如果没有,最简单的方法是什么?
我现在有一种解决方案,但它不是一个非常好的解决方案,并且需要链中每个元素的大量样板代码。它看起来有点像:
private class AutoUnsubscribing {
private final AtomicReference<Subscription> upstreamSubscription = new AtomicReference<>(null);
private final PublishProcessor<LedgerEvent> publisher = PublishProcessor.create();
public Flowable<LedgerEvent> getFlowable(Flowable<LedgerEvent> upstream) {
upstream.subscribe(new Subscriber<LedgerEvent>() {
@Override
public void onSubscribe(Subscription s) {
upstreamSubscription.set(s);
}
@Override
public void onNext(LedgerEvent ledgerEvent) {
publisher.onNext(ledgerEvent);
}
@Override
public void onError(Throwable t) {
publisher.onError(t);
}
@Override
public void onComplete() {
publisher.onComplete();
}
});
return publisher.doOnCancel(() -> {
if (!publisher.hasSubscribers()) {
// no more subscribers. unsubscribe from upstream and complete this stream
upstreamSubscription.get().cancel();
upstreamSubscription.set(null);
publisher.onComplete();
}
});
}
}
但必须有一个比这更好的解决方案。
现在我已经获得了更多RxJava的经验,我能够为我的问题制定一个答案。基本上,我们拥有的链条如下:
+----------+ +-------------+ +----------+ | consumer |←-----| transformer |←-----| producer | +----------+ +-------------+ +----------+
真正需要的是一种让transformer
可以取消的方法,当它被取消时,它对producer
的订阅被取消(所以producer
可以在终止之前做一些清理),并且transformer
完成了自己(这样consumer
将成为意识到这一点)。
我能找到的最佳解决方案是实现detachable
运算符,如下所示:
public static <T> FlowableTransformer<T, T> detachable(
Consumer<Disposable> disposableConsumer) {
PublishProcessor<Object> stopper = PublishProcessor.create();
Disposable disposable = new Disposable() {
private AtomicBoolean disposed = new AtomicBoolean(false);
@Override
public void dispose() {
if (disposed.compareAndSet(false, true)) {
stopper.onNext(new Object());
}
}
@Override
public boolean isDisposed() {
return disposed.get();
}
};
return upstream -> upstream.takeUntil(stopper)
.doOnSubscribe(s -> disposableConsumer.accept(disposable));
}
然后可以这样使用:
AtomicReference<Disposable> transformerDisposable = new AtomicReference<>();
producer
.compose(detachable(transformerDisposable::set))
.subscriber(consumer)
然后transfomer
可以随意取消使用
transformerDisposable.dispose()
这确实会让producer
称之为自己的onUnsubscribe
,而consumer
则会收到onComplete
通知。