给定无限通量的对象,其中每个对象都有一个 ID,我如何使用 Flux 创建按 ID 属性分组的更新缓冲列表(保留最后发出的值)? 谢谢
示例
Obj(ID=A, V=1)
Obj(ID=A, V=2)
Obj(ID=B, V=3)
--- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
Obj(ID=A, V=1)
Obj(ID=B, V=4)
Obj(ID=B, V=6)
Obj(ID=A, V=2)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
Obj(ID=B, V=1)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
像下面这样的东西将是完美的,但它似乎在我的测试中等待通量结束而不是缓冲。
flux
.buffer(Duration.ofMillis(2000))
.groupBy(Obj::getId)
.flatMap(GroupedFlux::getLast)
.collectToList()
.subscribe(this::printList);
它与缓冲区和自定义逻辑一起使用以进行分组
public static void main(String[] args) {
flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
}
private void groupList(List<T> ts) {
Collection<T> values = ts.stream()
.collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
.values();
System.out.println(values);
}
buffer
会发出 List<T>
,因此你可以使用非反应式 java 进行分组。例如,像您的示例中那样的 java 流。假设您的流程功能是反应性的,您可以继续流程
flux
.buffer(Duration.ofMillis(2000))
.map(list -> list.stream().collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
.flatMapIterable(Map::values)
.flatMap(obj -> process(obj));
有一个有点相似的用例。在 AWS S3 上有模拟数据。数据以 JSON 记录的形式存储在 512 个块中。需要下载数量可变的这些块并将它们转换为单个 JSON 数组。转换器通过返回已转换记录的数量来表示它不需要更多记录 - 当不需要更多记录时,该数量将为零。下面是两段代码。第一个使用Flux.window,而后者使用Flux.buffer。后者的执行速度约为前者的十分之一,但下载的数据超出了绝对必要的数量。
Flux.window代码
Flux<Flux<S3Object>> s3Objects = Flux.fromIterable(filteredList).window(windowSize);
Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
Flux <File> downloadedFiles = innerS3Objects.publishOn(Schedulers.boundedElastic()).concatMap(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key()));
return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
return transformMockarooData(downloadedFile, generator, targetHourCount);
}).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);
}).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();
Flux.buffer代码
Flux<List<S3Object>> s3Objects = Flux.fromIterable(filteredList).buffer(windowSize);
Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
List <Mono<File>> toDownloadFiles = innerS3Objects.stream().map(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key())).collect(Collectors.toList());
Flux <File> downloadedFiles = Flux.merge(toDownloadFiles);
return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
return transformMockarooData(downloadedFile, generator, targetHourCount);
}).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);
}).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();
我能够通过反应性分组实现它
flux.window(Duration.ofMillis(2000))
.flatMap(window -> window.groupBy(Entry::getId)
.flatMap(GroupedFlux::last)
.collectList()
)
.subscribe(this::printList);