RxJava2中是否有doAfterSubscribe等效项?

问题描述 投票:0回答:1

我有一个观察到的流对所有请求的响应。我想在发出请求时创建一个可观察的过滤器,以便可以与多个订户共享输出。以下是一些示例代码。

    PublishSubject<String> publishSubject = PublishSubject.create();
    Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
            .doOnSubscribe(disposable -> {
                publishSubject.onNext("foobar");
            })
            .replay(1)
            .refCount();

    fooObservable.subscribe(val -> log.info("A val : <{}>", val));

我使用PublishSubject作为我的模拟服务,因为有时该服务会立即返回响应。

我发现的是,因为在立即得到结果时没有当前订阅,所以我的fooObservable没有被填充。即,当我想查看时,我没有日志输出:

A val : <foobar>

请注意,使用此代码我得到的结果相同:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable<String> fooObservable = publishSubject.filter(value -> value.startsWith("foo"))
        .replay(1)
        .refCount();

publishSubject.onNext("foobar");
fooObservable.subscribe(val -> log.info("A val : <{}>", val));

因此,问题在于fooObservable在订阅之后才订阅PublishSubject,>

第一次订阅fooObservable之后是否有办法立即运行代码?

我有一个观察到的流对所有请求的响应。我想在发出请求时创建一个可观察的过滤器,以便可以与多个订户共享输出。 ...

java rx-java rx-java2
1个回答
0
投票

我认为您需要publish()connect()Here您可以了解更多有关它的信息。

© www.soinside.com 2019 - 2024. All rights reserved.