我在我的websocket中使用connectableObserver,我需要操纵对象将其转换为另一个对象,但我似乎无法使用flatMap()。这是代码:
@Override
public ConnectableObservable<Object> getWebSocketObservable() {
return getDataManager().getWebSocketObservable();
}
getWebSocketObservable
返回ConnectableObservable<Pair<Boolean,String>>
。我想操纵它成为另一个对象(或者在这种情况下只是对象)。并在旁注。在该片段中,它不接受返回值,即使它只需要一个通用的“对象”。谁可以给我解释一下这个?
将ConnectableObservable
作为源返回非常罕见,因为在第一次转换后会立即丢失此类型。让客户在常规的publish()
上使用share()
或Observable
代替:
@Override
public Observable<Object> getWebSocketObservable() {
return getDataManager().getWebSocketObservable()
.cast(Object.class);
}
ConnectableObservable<Object> obs = getWebSocketObservable()
.publish();
obs.subscribe(System.out::println, Throwable::printStackTrace);
obs.subscribe(System.out::println, Throwable::printStackTrace);
obs.connect();
另请注意,与C#不同,Java在类型签名中没有明确的共同和逆变的概念,因此即使Observable<T>
,也不能将Observable<Object>
向下转换为T extends Object
。在这种情况下,您可以尝试cast()
或通过(Observable)obs
强制转换,然后@SuppressWarnings
未经检查和原始类型警告
ConnectableObservable最初不会发出任何内容。当调用connect时,它将为其source observable(我们称之为发布的那个)创建一个新的订阅。它将开始接收事件并将其推送给其订阅者。所有订阅者将同时收到相同的事件,因为它们实际上共享相同的订阅:连接创建的订阅。
ConnectableObservable<Long> cold = Observable.interval(200,
TimeUnit.MILLISECONDS).publish();
cold.connect();
cold.subscribe(i -> System.out.println("First: " + i));
cold.subscribeOn(Schedulers.io())
// Be notified on the main thread
cold.observeOn(AndroidSchedulers.mainThread())
// Manipulation logic can replace this line
cold.map(longVar -> return Utils.manupulationOfConnectableObservable(input))
cold.subscribe(i -> System.out.println("Second: " + i));
根据要求实现Utils.manupulationOfConnectableObservable(输入)。
ConnectableObservable类似于普通的Observable,
除了它在订阅时没有开始发出项目,但只有在调用connect()方法时才开始。通过这种方式,您可以在Observable开始发出项目之前等待所有预期的Subscribers到Observable的Observable.subscribe()。
正如在docs中提到的那样,A ConnectableObservable类似于普通的Observable。所以我们可以应用可以应用于Observable的相同RxJava运算符。