我有一个观察到的流对所有请求的响应。我想在发出请求时创建一个可观察的过滤器,以便可以与多个订户共享输出。以下是一些示例代码。
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.doOnSubscribe(disposable -> {
publishSubject.onNext("foobar");
})
.replay(1)
.refCount();
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
我使用PublishSubject作为我的模拟服务,因为有时该服务会立即返回响应。
我发现的是,因为在立即得到结果时没有当前订阅,所以我的fooObservable没有被填充。即,当我想查看时,我没有日志输出:
A val : <foobar>
请注意,使用此代码我得到的结果相同:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
.replay(1)
.refCount();
publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));
因此,问题在于fooObservable在订阅之后才订阅PublishSubject,>
第一次订阅fooObservable之后是否有办法立即运行代码?
我有一个观察到的流对所有请求的响应。我想在发出请求时创建一个可观察的过滤器,以便可以与多个订户共享输出。 ...
我认为您需要publish()
和connect()
。 Here您可以了解更多有关它的信息。