使用 Flux.groupBy 排序数据,将 take(Duration.ofMillis) 替换为例如直到

问题描述 投票:0回答:1

鉴于我使用 Flux

groupBy
.take(Duration.ofMillis(10))
的下面代码,我每秒可以处理大约 50K 记录。在使用
Flux.just
在本地主机上进行测试时,我可以将延迟值设置为 1 毫秒并每秒获取 500K 条记录,但我不敢在生产中这样做。

数据来自按分组字段排序的数据库 (BigQuery)。有没有办法避免使用

.take(Duration.ofMillis(10))
及其固定延迟并使用例如
takeUntil
代替?

data class IncomingDto(
    val aggregation: Aggregation,
    val municipalityId: String,
    val personId: String,
    val startDate: LocalDate,
    val endDate: LocalDate? = null,
)

data class ResultDto(
    val aggregation: Aggregation,
    val municipalityId: String,
    val itemCount: Int,
)

fun reduceResult(
    someDtoFlux: Flux<IncomingDto>
): Flux<ResultDto> = someDtoFlux
    .groupBy { it.aggregation } // it is grouped by 4 fields in my prod code
    .flatMap { groupFlux ->
        groupFlux
            .take(Duration.ofMillis(10)) // https://stackoverflow.com/a/47906950/14072498
            .collectList()
            .mapNotNull { currentSomeDtoList ->
                // some reducing here

                // mock result
                ResultDto(
                    aggregation = currentSomeDtoList.first().aggregation,
                    municipalityId = currentSomeDtoList.first().municipalityId,
                    itemCount = 42
                )
            }
    }
kotlin project-reactor
1个回答
0
投票

当且仅当您的传入数据已按键/组/聚合排序,那么您不需要需要使用分组。然后,您可以使用 windowUntilChanged 按顺序分割数据,从而大大简化工作流程,并执行以下操作:

someDtoFlux
    .windowUntilChanged { it.aggregate }
    .flatMap { groupFlux -> 
        groupFlux
            .collectList()
            .mapNotNull { currentSomeDtoList ->
                // some reducing here

                // mock result
                ResultDto(
                    aggregation = currentSomeDtoList.first().aggregation,
                    municipalityId = currentSomeDtoList.first().municipalityId,
                    itemCount = currentSomeDtoList.size
                )
            }
    }

它将模仿这种命令式算法:

var currentValue = input.next()
var currentAccumulator = Accumulator(currentValue)
while (input.hasNext) {
    currentValue = input.next()
    if (currentValue.key == currentAccumulator.key) {
        currentAccumulator.add(currentValue)
    } else {
        output.push(currentAccumulator)
        currentAccumulator = Accumulator(currentValue)
    }
}

这是一个完整的最小工作示例:

import reactor.core.publisher.Flux
import java.util.IntSummaryStatistics
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random

data class IncomingDto(val key: Int, val value: Int)

data class ResultDto(val key: Int, val valueStats: IntSummaryStatistics)

fun sortedIncoming() : Flux<IncomingDto> {
    val seed = Random.Default.nextLong()
    println("SEED: $seed")
    val rand = Random(seed)
    return Flux.push { sink ->
        val nbGroup = rand.nextInt(10, 1_000)
        for (group in 1..nbGroup) {
            val groupCount = rand.nextInt(100, 10_000)
            for (i in 1..groupCount) {
                sink.next(IncomingDto(group, rand.nextInt(1_000)))
            }
        }
    }
}

fun Flux<IncomingDto>.statsOverSortedGroups() : Flux<ResultDto> {
    return windowUntilChanged(IncomingDto::key)
        .flatMap {
            groupFlow ->
            groupFlow.map { ResultDto(it.key, IntSummaryStatistics().apply { accept(it.value) }) }
                .reduce { r1, r2 ->
                    check(r1.key == r2.key)
                    ResultDto(r1.key, r1.valueStats.apply { combine(r2.valueStats) })
                }
        }
}

fun main() {
    // Verify that no duplicate key is found, by referencing encountered keys in a map
    val encounteredKeys = ConcurrentHashMap<Int, Boolean>()
    sortedIncoming()
        .statsOverSortedGroups()
        .doOnNext {
            if (encounteredKeys.put(it.key, true) != null) {
                throw IllegalStateException("DUPLICATE KEY !")
            } else {
                println(it)
            }
        }
        .blockLast()
}
© www.soinside.com 2019 - 2024. All rights reserved.