我不知道我想要的是否完全可行,但首先让我分享代码。 下面的代码只是创建一个回调流(生产者)并将其发送到视图模型(消费者)。
sealed class Resource<out T:Any>{
data class Success<out T:Any> (val data:T):Resource<T>()
data class Error(val exception: Exception):Resource<Nothing>()
data class Loading(val message:String):Resource<Nothing>()
}
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
var globalJob: Job? = null
private fun performSomething(result: ((Resource<Int>) -> Unit)) {
globalJob = GlobalScope.launch {
result(Resource.Loading("Loading-1"))
delay(2500)
result(Resource.Loading("Loading-2"))
delay(2500)
result(Resource.Loading("Loading-3"))
delay(2500)
result(Resource.Error(Exception("Try again...")))
}
}
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
}
@HiltViewModel
class BViewModel @Inject constructor(
private val myRepository: MyRepository
): ViewModel() {
companion object {
const val TAG = "___BBBViewModel"
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
override fun onCleared() {
super.onCleared()
Log.i(TAG, "OnCleared")
}
}
如果我在流程完成之前用返回键返回,它会触发
awaitClose{}
,流程将成功终止。
但是当出现错误的情况时,我想按照viewmodel的要求做再现过程。
所以我需要以某种方式从 viewmodel 向 startPerform 函数发送请求,就像在 awaitclose 中一样。
我想编写如下代码。这可能吗?
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
restartFlow {
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
if ( it is Resource.Error ) {//-----------------
restartFlow //-----------------
}//-----------------
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
如果不能像上面这样写。那么似乎唯一的解决方案是使用如下界面编写。下面的代码有什么我需要改进或者需要注意的地方吗?
interface MyFlowListener {
fun start()
fun completion()
}
var startPerformListener: MyFlowListener? = null
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
startPerformListener = object : MyFlowListener {
override fun start() {
globalJob?.cancel()
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
}
override fun completion() {
globalJob?.cancel()
channel.close()
}
}
performSomething{ result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
globalJob?.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
init {
Log.i(TAG, "Initialized")
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
when ( it ) {
is Resource.Error -> {
myRepository.startPerformListener?.start()
}
is Resource.Loading -> {}
is Resource.Success -> {
myRepository.startPerformListener?.completion()
}
}
}.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
您不会在
callbackFlow
块中定义重试行为。您可以使用现有的 retry
流运算符。例如:
init {
Log.i(TAG, "Initialized")
class ResourceError: Exception()
myRepository.startPerform()
.onEach {
Log.i(TAG, it.toString())
if (it is Resource.Error) {
throw ResourceError()
}
//...
}
.retry { it is ResourceError }
.onCompletion {
Log.i(TAG, "OnCompletion")
}.launchIn(viewModelScope)
}
您的回调流程实施存在缺陷。它一次只能处理一个
globalJob
,所以如果一次收集多个流,其中一些会被泄漏。如果在并行收集时取消了任何一个,那么最新收集的可能会提前取消。
我不确定你是否只是想使用这个全局协程模拟一个实际的基于回调的 API,因为它真的很 hacky。如果您自己设计后端,则没有理由首先使用
callbackFlow
。例如:
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
fun startPerform(): Flow<Resource<Int>> = flow {
emit(Resource.Loading("Loading-1"))
delay(2500)
emit(Resource.Loading("Loading-2"))
delay(2500)
emit(Resource.Loading("Loading-3"))
delay(2500)
emit(Resource.Error(Exception("Try again...")))
}
}
如果您尝试使用协程临时模拟基于回调的 API 以测试您对
callbackFlow
的使用,那么您需要对其进行修改,使其可以同时处理多个“侦听器”。也许是这样的:
class MyRepository() {
companion object {
const val TAG = "___BBBMyRepository"
}
private fun performSomething(result: ((Resource<Int>) -> Unit)): Job {
return GlobalScope.launch {
result(Resource.Loading("Loading-1"))
delay(2500)
result(Resource.Loading("Loading-2"))
delay(2500)
result(Resource.Loading("Loading-3"))
delay(2500)
result(Resource.Error(Exception("Try again...")))
}
}
fun startPerform(): Flow<Resource<Int>> = callbackFlow {
val job = performSomething { result ->
Log.d(TAG, "performSomething $result")
trySend(result)
}
awaitClose {
job.cancel()
Log.d(TAG, "startPerform awaitClose")
}
}
}