我正在尝试创建一个流程,向我的 Android 应用程序发送信号以将数据上传到网络。数据存储在SQLite数据库中,并将分批上传。有 3 种不同的方式来触发此上传:
我的流程看起来像是
SharedFlow<Unit>
的合并
private val databaseFlow = dataSource.getEventBatchStream(batchSize)
private val timerFlow = TickerFlowFactory().createTickerFlow()
private val userRequestFlow: MutableStateFlow<Unit> = MutableStateFlow(Unit)
saveJob = scope.launch {
merge(databaseFlow, timerFlow, userRequestFlow)
.collectLatest {
saveEventBatch(dataSource.getEventBatch(batchSize))
}
}
似乎由于合并,事件被丢弃。我已经能够通过使用这样的渠道来解决这个问题
private val databaseFlow = dataSource.getEventBatchStream(batchSize)
private val timerFlow = TickerFlowFactory().createTickerFlow()
private val requestToSaveChannel = Channel<Unit>() // Here's the channel
timerJob = scope.launch {
timerFlow.collect { requestToSaveChannel.send(Unit) }
}
dbJob = scope.launch {
databaseFlow
.filter { it.size == batchSize }
.collect { requestToSaveChannel.send(Unit) }
}
saveJob = scope.launch {
requestToSaveChannel.receiveAsFlow()
.collect { savePeteEventBatch(dataSource.getEventBatch(batchSize)) }
}
这可行,但在习惯上似乎是错误的,我觉得应该有一个仅使用流程的解决方案,但我还没能弄清楚。
我不确定这是否有效,但值得一试......
我的第一个想法是过滤
private val databaseFlow = dataSource.getEventBatchStream(batchSize)
以确保仅触发正确大小的批次,然后添加延迟以防止合并
所以它看起来像这样:
private val databaseFlow = dataSource.getEventBatchStream(batchSize)
.filter { it.size == batchSize }
.onEach { delay(1) } // Add a slight delay to prevent conflation
private val timerFlow = TickerFlowFactory().createTickerFlow()
private val userRequestFlow: MutableSharedFlow<Unit> = MutableSharedFlow()
然后我们可以使用buffer来确保缓冲区中只保留最新的事件,防止不必要的背压,避免丢弃事件。
但是还有另一个名为 flatMapLatest 的扩展函数,它返回一个流,每当原始流发出一个值时,该流就会切换到由变换函数生成的新流并且默认情况下会对其进行缓冲
我们可以使用它。
在您的场景中使用它的示例如下:
saveJob = scope.launch {
flatMapLatest(databaseFlow, timerFlow, userRequestFlow) { _ ->
flow { emit(dataSource.getEventBatch(batchSize)) }
}.collect { batch ->
saveEventBatch(batch)
}
}
这里我们使用 flatMapLatest 每当任何其他流发出值时发出事件批次,然后我们保存它
监控多个流:flatMapLatest 充当“流合并器”,持续监控所有三个流:databaseFlow、timerFlow 和 userRequestFlow。
对最新的发射做出反应:一旦这些流中的任何一个发出值,flatMapLatest 就会立即采取行动。它取消了之前流量中的任何持续排放,并仅关注最新的排放。
触发批量获取:对于发出值的流,flatMapLatest 执行提供的 lambda:
flow { emit(dataSource.getEventBatch(batchSize)) }
。这将创建一个从数据源获取事件批次的新流程。
保存批次:然后使用collect收集新创建的流,该流发出获取的批次。在collect中,调用saveEventBatch函数将批次保存到网络。
关键点:flatMapLatest确保仅处理和保存由三个流中的任何一个触发的最新批次。这可以防止合并并确保事件不会被意外删除或覆盖。
我真的希望这能起作用,因为我花了大约一个小时搜索这些信息
无论这是否有效,请随时通知我:)
我发现两个可能的问题,具体取决于您期望的行为。
你的
userRequestFlow
是一个MutableStateFlow<Unit>
,本质上是一个空操作,因为当你给它一些等于它的StateFlow
的东西时,value
不会发出事件。因此,这里必须切换到 MutableSharedFlow
。
如果问题是在
collectLatest
完成之前总是有一个事件,您需要使用 collect
和 conflate
,如下所示:
private val databaseFlow = dataSource.getEventBatchStream(batchSize)
private val timerFlow = TickerFlowFactory().createTickerFlow()
private val userRequestFlow: MutableSharedFlow<Unit> = MutableSharedFlow(Unit)
saveJob = scope.launch {
merge(databaseFlow, timerFlow, userRequestFlow)
.conflate()
.collect {
saveEventBatch(dataSource.getEventBatch(batchSize))
}
}
conflate
运算符在等待 collect
完成时丢弃发出的值,但将最新值保留在缓冲区中。因此,一旦 collect
完成,如果在此期间至少产生了一个事件,它将再次运行 saveEventBatch
。如果没有产生事件,它将等待下一个事件。