我有意将我目前使用的结构转换为流。这听起来可能很愚蠢或与您无关。 让我给你解释一下结构:
BleDataSource
的对象。例如,它具有如下功能。 fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
enqueueOperation(Connect(device, result))
}
fun callback() {
if ( operation is Connect ) {
if ( erroroccured )
operation.result(Resource.Error("error"))
else
operation.result(Resource.Loading())
}
}
fun onetherCallback() {
if ( operation is Connect ) {
operation.result(Resource.Success())
}
}
此功能保存到作业队列并在轮到时连接到设备。我可以报告作业的当前状态(例如加载、成功或错误),因为我还保存了
result
.
从存储库我使用这个功能如下:
override fun connect(device: BluetoothDevice, result: (Resource<BleOperationResult>) -> Unit) {
handler.performConnect(device, result)
}
但我希望它从存储库中以流的形式返回。
override fun connect(device: BluetoothDevice): Flow<Resource<BleOperationResult> {
handler.performConnect(device) {
}
}
我该怎么做? (根据推荐我也可以编辑
BleDataSource
班级)
编辑:
@SuppressLint("MissingPermission")
class BleDataSource @Inject constructor(
private val handler: Handler
) {
private val operationQueue = ConcurrentLinkedQueue<BleOperationType>()
private val operationLock = ReentrantLock()
private var pendingOperation: BleOperationType? = null
fun performConnect(device: BluetoothDevice, result: ((Resource<BleOperationResult>) -> Unit)) {
enqueueOperation(Connect(device, result))
}
@Synchronized
private fun enqueueOperation(operation: BleOperationType) {
handler.post {
operationQueue.add(operation)
if ( !operationLock.isLocked ) {
doNextOperation()
}
}
}
@Synchronized
private fun signalEndOfOperation() {
handler.post {
pendingOperation = null
operationLock.unlock()
if ( operationQueue.isNotEmpty() ) {
doNextOperation()
}
}
}
@Synchronized
private fun doNextOperation() {
if ( operationLock.isLocked ) {
Timber.i("doNextOperation already locked, returning...")
return
}
val operation = operationQueue.poll() ?: run {
Timber.v("Operation queue empty, returning...")
return
}
operationLock.lock()
pendingOperation = operation
if ( operation is Connect ) {
with(operation) {
operation.result(Resource.Loading(message = "Connecting to ${device.name}"))
bluetoothGatt = if ( Build.VERSION.SDK_INT < Build.VERSION_CODES.M ) {
device.connectGatt(context, false, gattCallback)
} else {
device.connectGatt(context, false, gattCallback, BluetoothDevice.TRANSPORT_LE)
}
}
}
}
override fun onConnectionStateChange(gatt: BluetoothGatt, status: Int, newState: Int) {
val deviceAddress = gatt.device.address
val operation = pendingOperation
var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")
if ( status == BluetoothGatt.GATT_SUCCESS ) {
if ( newState == BluetoothProfile.STATE_CONNECTED ) {
res = Resource.Loading(message = "Discovering Services")
gatt.discoverServices()
} else if ( newState == BluetoothProfile.STATE_DISCONNECTED ) {
res = Resource.Error(errorMessage = "Unexpected Disconnected")
}
} else {
res = Resource.Error(errorMessage = "Error:$status encountered fro:$deviceAddress!")
}
if ( operation is Connect ) {
operation.result(res)
}
if ( res is Resource.Error ) {
if ( operation is Connect ) {
signalEndOfOperation()
}
}
}
override fun onServicesDiscovered(gatt: BluetoothGatt?, status: Int) {
val operation = pendingOperation
var res: Resource<BleOperationResult> = Resource.Error(errorMessage = "Unknown Error!")
if ( status == BluetoothGatt.GATT_SUCCESS ) {
res = Resource.Success(data = BleOperationResult.ConnectionResult(profile))
}
} else {
res = Resource.Error(errorMessage = "Failed to discover services...")
}
if ( operation is Connect ) {
operation.result(res)
}
if ( pendingOperation is Connect ) {
signalEndOfOperation()
}
}
abstract class BleOperationType {
abstract val result: ((Resource<BleOperationResult>) -> Unit)
}
data class Connect(val device: BluetoothDevice,
override val result: ((Resource<BleOperationResult>) -> Unit)) : BleOperationType()
如果我没有理解错的话,这看起来像是一个经典的将“多镜头”回调函数转换为流的案例。
这可以使用 callbackFlow 来完成:
fun BleDataSource.connect(device: BluetoothDevice): Flow<Resource<BleOperationResult>> = callbackFlow {
performConnect(device) { res ->
val result = channel.trySend(res)
// if the callback caller can make sense of exceptions
result.exceptionOrNull?.let { throw it }
}
awaitClose()
}
问题是我看不到从您提供的 API 中注销回调的方法。一旦你用 lambda 调用了
performConnect
,看起来这个 lambda 监听器就不能被删除了。如果有办法做到这一点,你应该在传递给awaitClose { ... }
的lambda中做到这一点。
我可能无法正确理解你的问题,但我的理解是你在数据类中有一个函数在存储库中提供回调,你想在流程中转换它。
您可以简单地使用回调流并将回调转换为流 -
override fun connect(device: BluetoothDevice): Flow<Resource<BleOperationResult> {
return callbackFlow {
handler.performConnect(device) { result ->
trySend(result)
}
}
}
如果这不能解决您的问题,请告诉我。