所以我知道这已经被问过很多次了,但是我尝试了很多事情,但是似乎没有任何效果。
让我们从这些博客/文章/代码开始:
以及许多其他人。
简而言之,它们都描述了如何使用retryWhen来实现指数补偿。像这样的东西:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
即使库中的文档也同意:https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919。
但是,我已经尝试过此操作,并且做了一些非常相似的变体,在这里不值得描述,而且似乎没有任何效果。示例中有一种方法可以正常工作,并且正在使用阻止订阅者,但我想避免阻止线程。
因此,如果对先前的可观察对象,我们像这样应用阻塞用户:
.blockingForEach(System.out::println);
它按预期工作。但这不是主意。如果我们尝试:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
该流程只运行一次,因此不是我想要实现的。
这是否意味着它不能像我正在尝试的那样使用?从文档看来,尝试满足我的要求似乎不是问题。
知道我想念什么吗?
TIA。
编辑:我有2种测试方法:
一种测试方法(使用testng):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
从Kafka使用者(使用Spring Boot):
这只是对观察者的订阅,但是重试逻辑是我之前在帖子中描述的。
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}
您的代码的主要问题是Observable.timer is by default operating on the computation scheduler。在尝试验证测试中的行为时,这会增加工作量。
这里有一些单元测试代码,可验证您的重试代码是否确实在重试。
它使用TestScheduler而不是计算调度程序,因此我们可以假装通过advanceTimeBy及时移动。
TestScheduler testScheduler = new TestScheduler();
AtomicInteger counter = new AtomicInteger();
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
counter.getAndIncrement();
return Integer.parseInt(x);
});
TestObserver<Integer> testObserver = source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS, testScheduler);
});
})
.test();
assertEquals(1, counter.get());
testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(2, counter.get());
testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(3, counter.get());
testScheduler.advanceTimeBy(1, SECONDS);
assertEquals(4, counter.get());
testObserver.assertComplete();