安卓BLE使用RxAndroidBle以阻塞方式写入大文件。

问题描述 投票:0回答:1

我有一个Android应用,使用RxAndroidBle作为BLE解决方案,这真的很棒,节省了大量的工作时间。

但是最近我必须实现一个更新固件的功能,我被卡住了。

其中一个自定义的BLE设备在写入之前必须等待通知ByteAray。每隔16个包(每个包20个字节),设备就会按一定的特性发送一个通知ByteAray。

所以我想做的是等待通知,然后发送这些固件包。而且我发现我必须添加一个160ms的定时器,这样设备就不会被这些包淹没(背压?

还是没戏。设备会在一定的数据量后无响应,然后断开连接,比如256字节*12(文件大小范围为256字节*330~785)。

这是目前的实现方式。

 .flatMap { ifFrameCountAccepted ->
                if (ifFrameCountAccepted) {
                    Timber.d("Wait 2 seconds cleaning up flash")
                    Flowable
                        .timer(3000, TimeUnit.MILLISECONDS)
                        .flatMap { sendFramesFlowable(firmware, isPic) }
                } else {
                    Flowable.error(RuntimeException("MCU L2 frame count error."))
                }
            }
            .toObservable()
            .flatMap { isFirmwareTransmissionDone ->
                if (isFirmwareTransmissionDone) {
                    waitUntilL2McuUpgradeFinish(isPic)
                } else {
                    Observable.just(false)
                }
            }

    private fun sendFramesFlowable(
        firmware: FirmwareUpgradeData,
        isPic: Boolean
    ): Flowable<Boolean> {
        Timber.d("Firmware size: ${firmware.processed.size}")

        val frameInvalidPublishSubject = PublishSubject.create<Boolean>()
            val frameObservable = connection.toFlowable(BackpressureStrategy.BUFFER).flatMap { rxConnection ->
            rxConnection.setupNotification(
                RC_NOTIFICATION_CHARACTERISTIC,
                NotificationSetupMode.DEFAULT
            ).toFlowable(BackpressureStrategy.BUFFER)
                .concatMap { notification ->
                    notification.toFlowable(BackpressureStrategy.BUFFER)
                }
            }
                .filter { it.size == 20 }
                .filter { it.second.first().toUnsignedValue() == COMMAND_HEADER_L2_FRAME }
                .map { reply ->
                    Timber.d("[A2] decoded: ${reply.second.toHex()}")
                    val payload = reply.second.dataPayload(reply.first)
                    val isValid = upgradeDataTransmission.resolveFrameCommand(payload)
                    Timber.d("MCU L2 upgrade frame is accepted: $isValid")
                    unless(!isValid) {
                        frameInvalidPublishSubject.onNext(true)
                    }
                    isValid
                }
            .zipWith(Flowable.range(1, firmware.frameCount), BiFunction { _: Boolean, frameCount: Int ->
                frameCount
            })
            .doOnNext { frameCount ->
                val base = if (isPic) 45.minus(25) else 99.minus(45)
                val progress = base.div(firmware.frameCount.toFloat())
                    .times(frameCount).toInt()
                    .plus(if (isPic) 25 else 45)
                _upgradeProgress.postValue(Event.success(progress))
            }
            .flatMap { frameCount ->
                Timber.d("frame count now is:$frameCount")
                if (frameCount == firmware.frameCount) {
                    triggerL2Upgrade(firmware.crcCheck)
                } else {
                    Flowable.just(false)
                }
            }

        Flowable.fromIterable(firmware.processed.withIndex())
            .buffer(16)
            .takeUntil(frameInvalidPublishSubject.toFlowable(BackpressureStrategy.BUFFER))
            .concatMap { frame ->
                Flowable.timer(160, TimeUnit.MILLISECONDS).concatMap {
                    Flowable.fromIterable(frame)
                        .concatMap { perPackage ->
                            connection.toFlowable(BackpressureStrategy.BUFFER)
                                .concatMap {
                                    it.writeCharacteristic(RC_WRITE_CHARACTERISTIC, perPackage.value.toByteArray())
                                        .toFlowable()
                                }
                        }
                }
            }
            .forEachWhile {
                true
            }

        return frameObservable
    }

android bluetooth-lowenergy rx-java2 rxandroidble
1个回答
0
投票

有一个内置的帮助程序,在 RxAndroidBle 对于这种情况。RxBleConnection.createLongWriteBuilder()

你可以 阅读它给你带来的确切可能性 但使用 writeOperationAckStrategy() 你可以推迟写下一个字节数组。

val notificationsEvery16thWrite: Observable<ByteArray> = (...)
rxBleConnection.createNewLongWriteBuilder()
    .setWriteOperationAckStrategy { writeAcks ->
        val emitEveryWriteAckApartEvery16th = writeAcks
            .scan(0 to null as Boolean?) { acc, boolean ->
                acc.first.plus(1) to boolean
            }
            .filter { it.first != 0 && it.first % 16 != 0 }
            .map { it.second!! }
        val emitEvery16thWriteAckAfterNotification = notificationsEvery16thWrite.zipWith(
            writeAcks.buffer(16), // buffer 16 writes ACKs
            BiFunction { _, writesCompleted -> writesCompleted.last() }) // use last when notification arrives
        Observable.merge(
            emitEveryWriteAckApartEvery16th,
            emitEvery16thWriteAckAfterNotification
        )
    }
© www.soinside.com 2019 - 2024. All rights reserved.