我有一种情况,需要观察userId,然后使用这些userId观察用户。用户ID或用户都可以随时更改,我想使发出的用户保持最新状态。这是我拥有的数据源的示例:
data class User(val name: String)
fun observeBestUserIds(): Flow<List<String>> {
return flow {
emit(listOf("abc", "def"))
delay(500)
emit(listOf("123", "234"))
}
}
fun observeUserForId(userId: String): Flow<User> {
return flow {
emit(User("${userId}_name"))
delay(2000)
emit(User("${userId}_name_updated"))
}
}
在这种情况下,我希望排放量是:
[[User(abc_name), User(def_name)]
,然后]
[[User(123_name), User(234_name)]
,然后]
[User(123_name_updated), User(234_name_updated)]
我想我可以像这样在RxJava中实现:
observeBestUserIds.concatMapSingle { ids ->
Observable.fromIterable(ids)
.concatMap { id ->
observeUserForId(id)
}
.toList()
}
我将编写什么函数来生成发出该消息的流程?
您可以使用flatMapConcat
val users = observeBestUserIds()
.flatMapConcat { ids ->
flowOf(*ids.toTypedArray())
.map { id ->
observeUserForId(id)
}
}
.flattenConcat()
.toList()