我正在使用RxJava
,我知道concat
,我想它确实适合我,因为我想先完成所有的第一个电话,然后做第二个,但我不知道如何实现它。
我从现在开始这样做:
private fun assignAllAnswersToQuestion(questionId: Long) {
answerListCreated.forEach { assignAnswerToQuestion(questionId, it.id) }
}
private fun assignAnswerToQuestion(questionId: Long, answerId: Long) {
disposable = questionService.addAnswerToQuestion(questionId,answerId,MyUtils.getAccessTokenFromLocalStorage(context = this))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
result -> //Do nothing it should call the next one
},
{ error -> toast(error.message.toString())}
)
}
但是,一旦完成所有这些forEach
,我想做这样的事情:
private fun assignAllAnswersToQuestion(questionId: Long) {
answerListCreated.forEach { assignAnswerToQuestion(questionId, it.id)
anotherCallHere(questionId) //Do it when the first forEach is finished!!
}
任何的想法?
此外,这是一个与协同程序这样做的方法吗?
我想你必须将.map
列表(answerListCreated
)列入Flowable
s,然后在此列表中使用Flowable.zip
。
zip
用于将Flowable
s的结果合并为一个结果。由于您不需要这些结果,我们会忽略它们。
在zip
之后你肯定所有以前的Flowable
s结束了,你可以.flatMap
执行你的下一个电话(假设anotherCallHere
返回Flowable
。
最后,它将是这样的:
val flowableList = answerListCreated.map { assignAnswerToQuestion(questionId, it.id) }
disposable = Flowable.zip(flowableList) { /* Ignoring results */ }
.flatMap { anotherCallHere(questionId) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// ...
}
应该注意的是,如果任何调用失败,整个链将失败(将调用onError
)。
我是协同程序的新手,但我想我可以回答他们:
您可以使用coroutines runBlocking {}来实现此目的。
private fun assignAllAnswersToQuestion(questionId: Long) = launch {
runBlocking {
answerListCreated.forEach { assignAnswerToQuestion(questionId, it.id) }
}
anotherCallHere(questionId)
}
private fun assignAnswerToQuestion(questionId: Long, answerId: Long) = launch (Dispatchers.IO) {
questionService.addAnswerToQuestion(
questionId,
answerId,
MyUtils.getAccessTokenFromLocalStorage(context = this)
)
}
launch {}返回一个Job对象,该对象成为父协程的子作业。 runBlocking {}将阻塞,直到它的所有子作业都完成,(另一种方法是使用launch {}。join(),它具有相同的效果)。
请注意,我已将这两个函数包装在launch {}块中。为了能够像这样调用launch {},你可能希望让你的类实现CoroutineScope
class MyActivityOrFragment: Activity(), CoroutineScope {
lateinit var job = SupervisorJob()
private val exceptionHandler =
CoroutineExceptionHandler { _, error ->
toast(error.message.toString()
}
override val coroutineContext = Dispatchers.Main + job + exceptionHandler
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
job.cancel()
}
...
}