重试后rxjava完成时可完成

问题描述 投票:0回答:1

我在Completable上使用retryWhen运算符,有没有办法告诉它从重试Flowable完成?像这样的东西 -

PublishSubject<?> retrySubject = PublishSubject.create();
public void someFunction() {
    someCompletable.retryWhen(new Function<Flowable<Throwable>, Publisher<?>>() {
        @Override
        public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
            return throwableFlowable.flatMap(throwable -> retrySubject.toFlowable(BackpressureStrategy.MISSING));
        }
    }).subscribe();
}

public void ignoreError(){
    retrySubject.onComplete();
}
rx-java2 reactivex
1个回答
1
投票

你不能通过给它一个空的源来阻止flatMap。此外,每个错误都会使越来越多的观察者订阅导致内存泄漏的主题。

使用takeUntil通过其他来源的帮助停止序列:

PublishProcessor<Throwable> stopProcessor = PublishProcessor.create();

source.retryWhen(errors -> 
    errors.takeUntil(
        stopProcessor
    )
    .flatMap(error -> Flowable.timer(1, TimeUnit.SECONDS))
)

stopProcessor.onComplete();

编辑如果要重复使用同一主题,可以禁止停止路径上的项目:

PublishProcessor<Integer> stopProcessor = PublishProcessor.create();

source.retryWhen(errors -> 
    errors.takeUntil(
        stopProcessor.ignoreElements().toFlowable()
    )
    .flatMap(error -> stopProcessor)
)

// retry
stopProcessor.onNext(1);

// stop
stopProcessor.onComplete();
© www.soinside.com 2019 - 2024. All rights reserved.