与kotlin协程流中的rxjava .toList()等效

问题描述 投票:0回答:1

我有一种情况,需要观察userId,然后使用这些userId观察用户。用户ID或用户都可以随时更改,我想使发出的用户保持最新状态。这是我拥有的数据源的示例:


data class User(val name: String)

fun observeBestUserIds(): Flow<List<String>> {
    return flow {
        emit(listOf("abc", "def"))
        delay(500)
        emit(listOf("123", "234"))
    }
}

fun observeUserForId(userId: String): Flow<User> {
    return flow {
        emit(User("${userId}_name"))
        delay(2000)
        emit(User("${userId}_name_updated"))
    }
}

在这种情况下,我希望排放量是:

[[User(abc_name), User(def_name)],然后]

[[User(123_name), User(234_name)],然后]

[User(123_name_updated), User(234_name_updated)]

我想我可以像这样在RxJava中实现:

observeBestUserIds.concatMapSingle { ids ->
    Observable.fromIterable(ids)
        .concatMap { id ->
            observeUserForId(id)
        }
        .toList()
}

我将编写什么函数来生成发出该消息的流程?

kotlin rx-java kotlin-coroutines kotlin-flow
1个回答
0
投票

您可以使用flatMapConcat

val users = observeBestUserIds()
        .flatMapConcat { ids ->
            flowOf(*ids.toTypedArray())
                .map { id ->
                    observeUserForId(id)
                }
        }
        .flattenConcat()
        .toList()
© www.soinside.com 2019 - 2024. All rights reserved.