RxJava观察室在插入时被触发多次

问题描述 投票:0回答:1

我在我的存储库实现中遇到了一个奇怪的问题。每次我调用应该从数据库获取数据并通过网络调用更新数据库的函数时,都会从数据库观察器收到多个结果。

override fun getApplianceControls(
    serialNumber: SerialNumber
): Flowable<ApplianceControlState> {
    val subject = BehaviorProcessor.create<ApplianceControlState>()

    controlsDao.get(serialNumber.serial)
        .map { controls ->
            ApplianceControlState.Loaded(controls.toDomainModel())
        }
        .subscribe(subject)

    controlApi.getApplianceControls(serialNumber.serial)
        .flatMapObservable<ApplianceControlState> { response ->
            val entities = response.toEntity(serialNumber)
            // Store the fetched controls on the database.
            controlsDao.insert(entities).andThen(
                // Return an empty observable because the db will take care of emitting latest values.
                Observable.create { }
            )
        }
        .onErrorResumeNext { error: Throwable ->
            Observable.create { emitter -> emitter.onNext(ApplianceControlState.Error(error)) }
        }
        .subscribeOn(backgroundScheduler)
        .subscribe()


    return subject.distinctUntilChanged()
}
@Dao
interface ApplianceControlsDao {

    @Insert(onConflict = OnConflictStrategy.REPLACE)
    fun insert(controls: List<TemperatureControlEntity>): Completable

    @Query("SELECT * FROM control_temperature WHERE serial = :serial")
    fun get(serial: String): Flowable<List<TemperatureControlEntity>>
}

[基本上,如果我一次调用getApplianceControls,则会得到理想的结果。然后,我再次拨打另一个序列号,该序列号为空,然后得到空数组。 但是我第三次打电话,但是序列号与第一次相同,所以在进行插入调用后,我得到了正确结果和空数组的混合。

喜欢这个:

第一个呼叫,至序列号“ 123”->已加载([control1,control2,control3])

第二次呼叫,至序列号“ 000”->已加载([]]

第3次调用,至序列号“ 123”->已加载([control1,control2,control3]),已加载([]),已加载([control1,control2,control3])]

如果我从api响应中删除数据库插入,则可以正常工作。在调用insert之后,一切都会发生。

编辑:从getApplianceControls()中调用ViewModel

fun loadApplianceControls(serialNumber: SerialNumber) {
    Log.i("Loading appliance controls")

    applianceControlRepository.getApplianceControls(serialNumber)
        .subscribeOn(backgroundScheduler)
        .observeOn(mainScheduler)
        .subscribeBy(
            onError = { error ->
                Log.e("Error $error")
            },
            onNext = { controlState ->
                _controlsLiveData.value = controlState  
            }
        ).addTo(disposeBag)
}
android rx-java rx-java2 android-room
1个回答
0
投票

正如我在评论中提到的那样,您有2个未在任何地方取消订阅的订阅,这可能会导致内存泄漏(当处置主题时,它不会处置),而且使用这种实现,您也会忽略API错误。我会尝试将其更改为:

override fun getApplianceControls(serialNumber: SerialNumber): Flowable<ApplianceControlState> {

    val dbObservable = controlsDao.get(serialNumber.serial)
        .map { controls ->
            ApplianceControlState.Loaded(controls.toDomainModel())
        }

    val apiObservable = controlApi.getApplianceControls(serialNumber.serial)
        .map { response ->
            val entities = response.toEntity(serialNumber)
           // Store the fetched controls on the database.
           controlsDao.insert(entities).andThen( Unit )
        }

    return Observables.combineLatest(dbObservable, apiObservable) { dbData, _ -> dbData }
        // apiObservable emits are ignored, but it will by subscribed with dbObservable and Errors are not ignored 
        .onErrorResumeNext { error: Throwable ->
            Observable.create { emitter -> emitter.onNext(ApplianceControlState.Error(error)) }
        }
        .subscribeOn(backgroundScheduler)
        //observeOn main Thread
        .distinctUntilChanged()
}

我不确定是否能解决原始问题。但是,如果是这样-问题出在flatMapObservable中另请参阅controlApi.getApplianceControls()的实现。

© www.soinside.com 2019 - 2024. All rights reserved.