操纵ConnectableObserver的数据

问题描述 投票:0回答:2

我在我的websocket中使用connectableObserver,我需要操纵对象将其转换为另一个对象,但我似乎无法使用flatMap()。这是代码:

@Override
public ConnectableObservable<Object> getWebSocketObservable() {
    return getDataManager().getWebSocketObservable();
}

getWebSocketObservable返回ConnectableObservable<Pair<Boolean,String>>。我想操纵它成为另一个对象(或者在这种情况下只是对象)。并在旁注。在该片段中,它不接受返回值,即使它只需要一个通用的“对象”。谁可以给我解释一下这个?

java android rx-java2
2个回答
0
投票

ConnectableObservable作为源返回非常罕见,因为在第一次转换后会立即丢失此类型。让客户在常规的publish()上使用share()Observable代替:

@Override
public Observable<Object> getWebSocketObservable() {
    return getDataManager().getWebSocketObservable()
        .cast(Object.class);
}

ConnectableObservable<Object> obs = getWebSocketObservable()
    .publish();

obs.subscribe(System.out::println, Throwable::printStackTrace);
obs.subscribe(System.out::println, Throwable::printStackTrace);

obs.connect();    

另请注意,与C#不同,Java在类型签名中没有明确的共同和逆变的概念,因此即使Observable<T>,也不能将Observable<Object>向下转换为T extends Object。在这种情况下,您可以尝试cast()或通过(Observable)obs强制转换,然后@SuppressWarnings未经检查和原始类型警告


-1
投票

ConnectableObservable最初不会发出任何内容。当调用connect时,它将为其source observable(我们称之为发布的那个)创建一个新的订阅。它将开始接收事件并将其推送给其订阅者。所有订阅者将同时收到相同的事件,因为它们实际上共享相同的订阅:连接创建的订阅。

ConnectableObservable<Long> cold = Observable.interval(200, 
TimeUnit.MILLISECONDS).publish();
cold.connect();

cold.subscribe(i -> System.out.println("First: " + i));
cold.subscribeOn(Schedulers.io())
// Be notified on the main thread
cold.observeOn(AndroidSchedulers.mainThread())
// Manipulation logic can replace this line
cold.map(longVar -> return Utils.manupulationOfConnectableObservable(input))
cold.subscribe(i -> System.out.println("Second: " + i));

根据要求实现Utils.manupulationOfConnectableObservable(输入)。

ConnectableObservable类似于普通的Observable,

除了它在订阅时没有开始发出项目,但只有在调用connect()方法时才开始。通过这种方式,您可以在Observable开始发出项目之前等待所有预期的Subscribers到Observable的Observable.subscribe()。

正如在docs中提到的那样,A ConnectableObservable类似于普通的Observable。所以我们可以应用可以应用于Observable的相同RxJava运算符。

© www.soinside.com 2019 - 2024. All rights reserved.