鉴于我使用 Flux
groupBy
和 .take(Duration.ofMillis(10))
的下面代码,我每秒可以处理大约 50K 记录。在使用 Flux.just
在本地主机上进行测试时,我可以将延迟值设置为 1 毫秒并每秒获取 500K 条记录,但我不敢在生产中这样做。
数据来自按分组字段排序的数据库 (BigQuery)。有没有办法避免使用
.take(Duration.ofMillis(10))
及其固定延迟并使用例如takeUntil
代替?
data class IncomingDto(
val aggregation: Aggregation,
val municipalityId: String,
val personId: String,
val startDate: LocalDate,
val endDate: LocalDate? = null,
)
data class ResultDto(
val aggregation: Aggregation,
val municipalityId: String,
val itemCount: Int,
)
fun reduceResult(
someDtoFlux: Flux<IncomingDto>
): Flux<ResultDto> = someDtoFlux
.groupBy { it.aggregation } // it is grouped by 4 fields in my prod code
.flatMap { groupFlux ->
groupFlux
.take(Duration.ofMillis(10)) // https://stackoverflow.com/a/47906950/14072498
.collectList()
.mapNotNull { currentSomeDtoList ->
// some reducing here
// mock result
ResultDto(
aggregation = currentSomeDtoList.first().aggregation,
municipalityId = currentSomeDtoList.first().municipalityId,
itemCount = 42
)
}
}
当且仅当您的传入数据已按键/组/聚合排序,那么您不需要需要使用分组。然后,您可以使用 windowUntilChanged 按顺序分割数据,从而大大简化工作流程,并执行以下操作:
someDtoFlux
.windowUntilChanged { it.aggregate }
.flatMap { groupFlux ->
groupFlux
.collectList()
.mapNotNull { currentSomeDtoList ->
// some reducing here
// mock result
ResultDto(
aggregation = currentSomeDtoList.first().aggregation,
municipalityId = currentSomeDtoList.first().municipalityId,
itemCount = currentSomeDtoList.size
)
}
}
它将模仿这种命令式算法:
var currentValue = input.next()
var currentAccumulator = Accumulator(currentValue)
while (input.hasNext) {
currentValue = input.next()
if (currentValue.key == currentAccumulator.key) {
currentAccumulator.add(currentValue)
} else {
output.push(currentAccumulator)
currentAccumulator = Accumulator(currentValue)
}
}
这是一个完整的最小工作示例:
import reactor.core.publisher.Flux
import java.util.IntSummaryStatistics
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random
data class IncomingDto(val key: Int, val value: Int)
data class ResultDto(val key: Int, val valueStats: IntSummaryStatistics)
fun sortedIncoming() : Flux<IncomingDto> {
val seed = Random.Default.nextLong()
println("SEED: $seed")
val rand = Random(seed)
return Flux.push { sink ->
val nbGroup = rand.nextInt(10, 1_000)
for (group in 1..nbGroup) {
val groupCount = rand.nextInt(100, 10_000)
for (i in 1..groupCount) {
sink.next(IncomingDto(group, rand.nextInt(1_000)))
}
}
}
}
fun Flux<IncomingDto>.statsOverSortedGroups() : Flux<ResultDto> {
return windowUntilChanged(IncomingDto::key)
.flatMap {
groupFlow ->
groupFlow.map { ResultDto(it.key, IntSummaryStatistics().apply { accept(it.value) }) }
.reduce { r1, r2 ->
check(r1.key == r2.key)
ResultDto(r1.key, r1.valueStats.apply { combine(r2.valueStats) })
}
}
}
fun main() {
// Verify that no duplicate key is found, by referencing encountered keys in a map
val encounteredKeys = ConcurrentHashMap<Int, Boolean>()
sortedIncoming()
.statsOverSortedGroups()
.doOnNext {
if (encounteredKeys.put(it.key, true) != null) {
throw IllegalStateException("DUPLICATE KEY !")
} else {
println(it)
}
}
.blockLast()
}