我有一个“流量源”,我想给一个“消费者”一个SharedFlow。我希望一些消费者能够响应流中的数据,而其他消费者也会向流发射数据。每当数据被发送到流中时,我想用 onEach 记录它。
class FlowSource {
public dataFlow: MutableSharedFlow<Data> = MutableSharedFlow(0, 5).forEarch {Log(it}}
}
上面返回错误,因为 .onEach 返回一个新的 Flow。如果我尝试使用 .shareIn(xxxx) 它会创建一个 SharedFlow 而不是一个 Mutable 共享流。我如何返回一个已经添加了 .onEach 逻辑的 MutableSharedFlow,这样我就不必在消费者上添加 .onEach?
是的,没有办法直接创建一个内置副作用的 MutableSharedFlow。您基本上必须给它一个执行额外操作的收集器。像这样的东西:
val dataFlow: MutableSharedFlow<Data> = MutableSharedFlow(0, 5).apply {
onEach { Log(it} }.launchIn(someCoroutineScope)
}
上面的someCoroutineScope
需要是一个适当的范围,当这个dataFlow
超出范围时被取消,所以我们不会泄漏它。