使用RxJava flowable处理Android Room查询时提高性能

问题描述 投票:0回答:1

我的应用正在以尽可能高的采样率(在我的设备上为〜200 Hz)从加速度计收集传感器值,并将这些值保存在Room数据库中。我还想经常用最新的测量值更新一些图形,可以说每秒刷新5次。自从应用程序也以〜200 Hz收集线性加速度(无g)(因此,两个传感器各自在数据库中插入大约200Hz的值)时,我注意到应用程序性能大幅下降,并且滞后了几秒钟在收集的加速度值之间并显示在图中。从探查器中,我的猜测是RxComputationThread是瓶颈,因为由于Flowable使得RxComputationThread几乎始终处于活动状态。

我使用sample()来限制接收者更新,因为我的图形不需要超级频繁地更新。当我刚刚收集一个传感器时,这导致了可接受的性能。我看到RxJava提供了interval()方法来限制从发射器一侧发出的频率,但这对我来说似乎不可用? (未解决的参考)。

也许有人有一个想法如何提高性能?我总体上喜欢RxJava和Room的概念,并希望坚持使用它们,但是在这一点上,我几乎陷入了僵局。

这是我用来观察Room SQL表和更新图形的代码:

// Observe changes to the datasource and create a new subscription if necessary
sharedViewModel.dataSource.observe(viewLifecycleOwner, Observer { source ->
    Log.d("TAG", "Change observed!")
    when (source) {
        "acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
        "lin_acc" -> {
            val disposableDataSource =
                sharedViewModel.lastSecondsLinAccelerations
                    .sample(200, TimeUnit.MILLISECONDS)
                    .onBackpressureDrop()
                    .subscribeOn(Schedulers.io())
                    .subscribe { lastMeasurements ->
                        Log.d("TAG", Thread.currentThread().name)
                        if (sharedViewModel.isReset.value == true && lastMeasurements.isNotEmpty()) {
                            val t =
                                lastMeasurements.map { (it.time.toDouble() * 1e-9) - (lastMeasurements.last().time.toDouble() * 1e-9) }
                            val accX = lastMeasurements.map { it.accX.toDouble() }
                            val accY = lastMeasurements.map { it.accY.toDouble() }
                            val accZ = lastMeasurements.map { it.accZ.toDouble() }

                            // Update plots
                            updatePlots(t, accX, accY, accZ)
                        }
                    }
            compositeDisposable.clear()
            compositeDisposable.add(disposableDataSource)
        }
    }
})

获取最近10秒测量值的查询

@Query("SELECT * FROM acc_measurements_table WHERE time > ((SELECT MAX(time) from acc_measurements_table)- 1e10)")
fun getLastAccelerations(): Flowable<List<AccMeasurement>>
android kotlin rx-java android-room
1个回答
0
投票

感谢您的评论,我现在知道了瓶颈所在。问题是大量的插入调用,不足为奇。但是可以通过使用某种缓冲区一次插入多行来提高性能。

这是我添加的内容,以防有人在相同的情况下运行:

class InsertHelper(private val repository: Repository){
    var compositeDisposable = CompositeDisposable()

    private val measurementListAcc: FlowableList<AccMeasurement> = FlowableList()
    private val measurementListLinAcc: FlowableList<LinAccMeasurement> = FlowableList()

    fun insertAcc(measurement: AccMeasurement) {
        measurementListAcc.add(measurement)
    }
    fun insertLinAcc(measurement: LinAccMeasurement) {
        measurementListLinAcc.add(measurement)
    }

    init {
        val disposableAcc = measurementListAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertAcc(measurements)
                }
                measurementListAcc.remove(measurements as ArrayList<AccMeasurement>)
            }

        val disposableLinAcc = measurementListLinAcc.subject
            .buffer(50)
            .subscribe {measurements ->
                GlobalScope.launch {
                    repository.insertLinAcc(measurements)
                }
                measurementListLinAcc.remove(measurements as ArrayList<LinAccMeasurement>)
            }

        compositeDisposable.add(disposableAcc)
        compositeDisposable.add(disposableLinAcc)
    }
}
// Dynamic list that can be subscribed on
class FlowableList<T> {
    private val list: MutableList<T> = ArrayList()
    val subject = PublishSubject.create<T>()

    fun add(value: T) {
        list.add(value)
        subject.onNext(value)
    }

    fun remove(value: ArrayList<T>) {
        list.removeAll(value)
    }
}

我基本上使用动态列表来缓冲几十个测量样本,然后将它们整体插入到Room数据库中,并将其从动态列表中删除。这也是为什么批处理插入速度更快的一些信息:https://hackernoon.com/squeezing-performance-from-sqlite-insertions-with-room-d769512f8330

我对Android开发还是很陌生,因此,如果您发现一些错误或有建议,我会感谢您的每条评论:)

© www.soinside.com 2019 - 2024. All rights reserved.