我是RxJava和RxAndroidBle的新手,我很乐意帮助我解决一个问题。基本上,我有一个BLE设备,我订阅了四个特征。从这些可观测量发出的数据合并为一个可观察的:
private RxBleDevice mBleDevice;
private Disposable mConnectionSubscription;
...
mConnectionSubscription = mBleDevice.establishConnection(false)
.flatMap(rxBleConnection -> Observable.combineLatest(
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID0)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID1)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID2)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID3)).flatMap(observable -> observable),
MyDataClass::new
))
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::disconnect)
.subscribe(
myData -> {
this.onNotificationReceived(myData);
},
this::onNotificationSetupFailure
);
哪里:
public class MyDataClass<Data0, Data1, Data2, Data3> {
...
public MyDataClass(Data0 data0, Data1 data1, Data2 data2, Data3 data3) {
...
}
}
以上工作正常。我试图做的是订阅另一个特征。但是,这只会偶尔发出数据。因此,我想将这个额外的观察者连接到一个单独的观察者(比如onNotificationReceived2
)。我不希望这些额外的可观察数据与MyDataClass
相关。我该怎么做?
在你的flatMap()
之前,插入:.doOnNext( rxBleConnection -> savedConnection = rxBleConnection )
然后使用savedConnection
设置你的其他订阅。
处理(或丢失)原始连接时,请不要忘记将savedConnection
置零。
我找到了我追求的解决方案。根据this issue和Robert Lewis的评论,我最终做了以下事情:
private RxBleDevice mBleDevice;
private Disposable mConnectionSubscription1;
private Disposable mConnectionSubscription2;
...
Observable<RxBleConnection> sharedConnectionObservable = mBleDevice.establishConnection(false)
.replay(1).refCount();
mConnectionSubscription1 = sharedConnectionObservable
.flatMap(rxBleConnection -> Observable.combineLatest(
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID0)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID1)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID2)).flatMap(observable -> observable),
rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID3)).flatMap(observable -> observable),
MyDataClass::new
))
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::disconnect)
.subscribe(
myData -> {
this.onNotificationReceived(myData);
},
this::onNotificationSetupFailure
);
mConnectionSubscription2 = sharedConnectionObservable
.flatMap(rxBleConnection -> rxBleConnection.setupNotification(UUID.fromString(CHARACTERISTIC_UUID4)).flatMap(observable -> observable))
.observeOn(AndroidSchedulers.mainThread())
.doFinally(this::disconnect)
.subscribe(
myData2 -> {
this.onNotificationReceived2(myData2);
},
this::onNotificationSetupFailure
);