我有两个假设的数据流
intFlow
和 stringFlow
,我想将它们合并并通过管道传输到 UI。 stringFlow
有点不稳定,所以我使用 debounce(2000)
来“放慢速度”。然而,intFlow
中的数据流速度很慢,因此不应受到此 debouce 调用的影响。
我尝试过像
combine()
这样的功能,但它不起作用,因为intFlow
等待我不想要的stringFlow
。
我也尝试过
flatMapMerge{}
,但这也不起作用,不幸的是,我不明白为什么,因为我对这个函数的理解是它是异步的,并且传递给它的流不应该互相阻塞。
我最终使用了
merge()
,但由于数据类型不同,而且我需要稍后合并这两个数据,所以我最终使用了一个 hack:
val intFlow = MutableStateFlow(0)
val stringFlow = MutableStateFlow("")
fun transform(result: suspend (s: String, i: Int) -> Flow<String>): Flow<String> {
return merge(intFlow, stringFlow.debounce(2000))
.map {
if (it is Int) Pair(stringFlow.value, it) else Pair(it as String, intFlow.value)
}
.filter { (s, i) -> s.isNotBlank() && i > 0 }
.distinctUntilChanged()
.flatMapLatest { (s, i) -> result(s, i) }
// return intFlow.flatMapMerge { i ->
// stringFlow.debounce(2000).map { s -> Pair(s, i) }
// }
// .filter { (s, i) -> s.isNotBlank() && i > 0 }
// .distinctUntilChanged()
// .flatMapLatest { (s, i) -> result(s, i) }
}
val combinedFlow = transform { s, i ->
return@transform flow {
emit("$s::$i")
}
}
fun getCurrentTime(): String {
val currentDateTime = LocalDateTime.now()
val formatter = DateTimeFormatter.ofPattern("HH:mm:ss:SSS")
return currentDateTime.format(formatter) + " | ${
currentDateTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()
}"
}
suspend fun main() {
val mainJob = CoroutineScope(Dispatchers.Default).launch {
println("${getCurrentTime()} - Updating String to 'a'")
stringFlow.update { "a" }
println("${getCurrentTime()} - Updating Int to 1")
intFlow.update { 1 }
println("${getCurrentTime()} - Updating String to 'a'")
stringFlow.update { "a" }
println("${getCurrentTime()} - Updating Int to 2")
intFlow.update { 2 }
println("${getCurrentTime()} - Delaying for 5 seconds")
delay(5000)
println("${getCurrentTime()} - Updating String with 'b'")
stringFlow.update { "b" }
println("${getCurrentTime()} - Updating Int to 3")
intFlow.update { 3 }
}
val collectJob = CoroutineScope(Dispatchers.Default).launch {
combinedFlow.collect {
println("${getCurrentTime()} - Combined: $it-------- ")
}
}
mainJob.join()
collectJob.join()
}
这可行,但我担心黑客攻击:
.map {
if (it is Int) Pair(stringFlow.value, it) else Pair(it as String, intFlow.value)
}
如果我可能会遇到并发问题,例如使用过时的数据或其他问题。
使用
flatMapMerge{}
,我得到以下输出:
21:14:25:883 | 1713273265883 - Updating String to 'a'
21:14:25:891 | 1713273265891 - Updating Int to 1
21:14:25:892 | 1713273265892 - Updating String to 'a'
21:14:25:892 | 1713273265892 - Updating Int to 2
21:14:25:892 | 1713273265892 - Delaying for 5 seconds
21:14:27:929 | 1713273267929 - Combined: a::2--------
21:14:30:898 | 1713273270898 - Updating String with 'b'
21:14:30:899 | 1713273270899 - Updating Int to 3
21:14:32:910 | 1713273272910 - Combined: b::2--------
21:14:32:910 | 1713273272910 - Combined: b::3--------
int 的发布和接收之间有大约 2 秒的延迟。
将此与
merge()
进行比较:
21:15:54:969 | 1713273354969 - Updating String to 'a'
21:15:54:974 | 1713273354974 - Updating Int to 1
21:15:54:975 | 1713273354975 - Updating String to 'a'
21:15:54:975 | 1713273354975 - Updating Int to 2
21:15:54:975 | 1713273354975 - Delaying for 5 seconds
21:15:55:009 | 1713273355009 - Combined: a::2--------
21:15:59:986 | 1713273359986 - Updating String with 'b'
21:15:59:987 | 1713273359987 - Updating Int to 3
21:15:59:991 | 1713273359991 - Combined: b::3--------
而且发布和接收之间几乎没有延迟。
我尝试过像
这样的功能,但它不起作用,因为combine()
等待我不想要的intFlow
。stringFlow
这不是真的,
intFlow
不会等待stringFlow
。当底层流中的any具有新值时,组合流会发出新值,而不是当它们中的all发出新值时。 这似乎正是您需要的行为:当
intFlow
发出时,结果流将始终发出,并将使用
stringFlow
中的 最新可用值。另一方面,当去抖的
stringFlow
最终发出新值时,生成的流现在将also 发出。 唯一的限制是您必须确保去抖流至少发出
one 值,以便合并可以开始。如果您无法轻松重构stringFlow
以像这样工作,您始终可以在去抖动之后应用
onStart
以发出初始值,然后再将其与
intFlow
组合。