将 lambda 结果隐藏起来

问题描述 投票:0回答:2

我有意将我目前使用的结构转换为流。这听起来可能很愚蠢或与您无关。 让我给你解释一下结构:

  • 我有一个名为
    BleDataSource
    的对象。例如,它具有如下功能。
    fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
        enqueueOperation(Connect(device, result))
    }

    fun callback() {
        if ( operation is Connect ) {
           if  ( erroroccured )
               operation.result(Resource.Error("error"))
           else
               operation.result(Resource.Loading())
        }
    }

    fun onetherCallback() {
        if ( operation is Connect ) {
               operation.result(Resource.Success())
        }
    }

  • 此功能保存到作业队列并在轮到时连接到设备。我可以报告作业的当前状态(例如加载、成功或错误),因为我还保存了

    result
    .

  • 从存储库我使用这个功能如下:

    override fun connect(device: BluetoothDevice, result: (Resource<BleOperationResult>) -> Unit) {
        handler.performConnect(device, result)
    }

但我希望它从存储库中以流的形式返回。

    override fun connect(device: BluetoothDevice): Flow<Resource<BleOperationResult> {
        handler.performConnect(device) {
        }
    }

我该怎么做? (根据推荐我也可以编辑

BleDataSource
班级)

编辑:

我的队列机制:


@SuppressLint("MissingPermission")
class BleDataSource  @Inject constructor(
    private val handler: Handler
) {

    private val operationQueue = ConcurrentLinkedQueue<BleOperationType>()
    private val operationLock = ReentrantLock()
    private var pendingOperation: BleOperationType? = null

    fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
        enqueueOperation(Connect(device, result))
    }

    @Synchronized
    private fun enqueueOperation(operation: BleOperationType) {
        handler.post {
            operationQueue.add(operation)
            if ( !operationLock.isLocked ) {
                doNextOperation()
            }
        }
    }


    @Synchronized
    private fun signalEndOfOperation() {
        handler.post {
            pendingOperation = null
            operationLock.unlock()
            if ( operationQueue.isNotEmpty() ) {
                doNextOperation()
            }
        }
    }


    @Synchronized
    private fun doNextOperation() {
        if ( operationLock.isLocked ) {
            Timber.i("doNextOperation already locked, returning...")
            return
        }

        val operation = operationQueue.poll() ?: run {
            Timber.v("Operation queue empty, returning...")
            return
        }
        operationLock.lock()
        pendingOperation = operation


        if ( operation is Connect ) {
            with(operation) {
                operation.result(Resource.Loading(message = "Connecting to ${device.name}"))
                bluetoothGatt = if ( Build.VERSION.SDK_INT < Build.VERSION_CODES.M ) {
                    device.connectGatt(context, false, gattCallback)
                } else {
                    device.connectGatt(context, false, gattCallback, BluetoothDevice.TRANSPORT_LE)
                }
            }
        }

    }


        override fun onConnectionStateChange(gatt: BluetoothGatt, status: Int, newState: Int) {
            val deviceAddress = gatt.device.address
            val operation = pendingOperation
            var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")

            if ( status == BluetoothGatt.GATT_SUCCESS ) {
                if ( newState == BluetoothProfile.STATE_CONNECTED ) {
                    res = Resource.Loading(message = "Discovering Services")
                    gatt.discoverServices()
                } else if ( newState == BluetoothProfile.STATE_DISCONNECTED ) {
                    res = Resource.Error(errorMessage = "Unexpected Disconnected")
                }
            } else {
                res = Resource.Error(errorMessage = "Error:$status encountered fro:$deviceAddress!")
            }

            if ( operation is Connect ) {
                operation.result(res)
            }
            if ( res is Resource.Error ) {
                if ( operation is Connect  ) {
                signalEndOfOperation()
                }
            }
        }


        override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
            val operation = pendingOperation
            var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")

            if ( status == BluetoothGatt.GATT_SUCCESS ) {
              res = Resource.Success(data = BleOperationResult.ConnectionResult(profile))
                }
            } else {
                res = Resource.Error(errorMessage = "Failed to discover services...")
            }

            if ( operation is Connect ) {
                operation.result(res)
            }
            if ( pendingOperation is Connect ) {
                signalEndOfOperation()
            }
        }

abstract class BleOperationType {
    abstract val result: ((Resource<BleOperationResult>) -> Unit)
}
data class Connect(val device: BluetoothDevice,
                   override val result: ((Resource<BleOperationResult>) -> Unit)) : BleOperationType()

android kotlin flow
2个回答
3
投票

如果我没有理解错的话,这看起来像是一个经典的将“多镜头”回调函数转换为流的案例。

这可以使用 callbackFlow 来完成:

fun BleDataSource.connect(device: BluetoothDevice): Flow<Resource<BleOperationResult>> = callbackFlow {
    performConnect(device) { res ->
        val result = channel.trySend(res)

        // if the callback caller can make sense of exceptions
        result.exceptionOrNull?.let { throw it }
    }
    awaitClose()
}

问题是我看不到从您提供的 API 中注销回调的方法。一旦你用 lambda 调用了

performConnect
,看起来这个 lambda 监听器就不能被删除了。如果有办法做到这一点,你应该在传递给
awaitClose { ... }
的lambda中做到这一点。


1
投票

我可能无法正确理解你的问题,但我的理解是你在数据类中有一个函数在存储库中提供回调,你想在流程中转换它。

您可以简单地使用回调流并将回调转换为流 -

override fun connect(device: BluetoothDevice): Flow<Resource<BleOperationResult> {
    return callbackFlow {
             handler.performConnect(device) { result ->
                 trySend(result)
             }
           }
}

如果这不能解决您的问题,请告诉我。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.