如何在Java中对Reactor Flux中的元素进行缓冲和分组

问题描述 投票:0回答:3

给定无限通量的对象,其中每个对象都有一个 ID,我如何使用 Flux 创建按 ID 属性分组的更新缓冲列表(保留最后发出的值)? 谢谢

示例

    Obj(ID=A, V=1)
    Obj(ID=A, V=2)
    Obj(ID=B, V=3) 
    --- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
    Obj(ID=A, V=1)
    Obj(ID=B, V=4)
    Obj(ID=B, V=6)
    Obj(ID=A, V=2)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
    Obj(ID=B, V=1)
    --- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]

像下面这样的东西将是完美的,但它似乎在我的测试中等待通量结束而不是缓冲。

flux
    .buffer(Duration.ofMillis(2000))
    .groupBy(Obj::getId)
    .flatMap(GroupedFlux::getLast)
    .collectToList()
    .subscribe(this::printList);

它与缓冲区和自定义逻辑一起使用以进行分组

    public static void main(String[] args) {
        flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
    }

    private void groupList(List<T> ts) {
        Collection<T> values = ts.stream()
                .collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
                .values();
        System.out.println(values);
    }
java project-reactor reactor
3个回答
0
投票

buffer
会发出
List<T>
,因此你可以使用非反应式 java 进行分组。例如,像您的示例中那样的 java 流。假设您的流程功能是反应性的,您可以继续流程

flux
    .buffer(Duration.ofMillis(2000))
    .map(list -> list.stream().collect(Collectors.toMap(Entry::getId, Function.identity(), (k, v) -> v)))
    .flatMapIterable(Map::values)
    .flatMap(obj -> process(obj));

0
投票

有一个有点相似的用例。在 AWS S3 上有模拟数据。数据以 JSON 记录的形式存储在 512 个块中。需要下载数量可变的这些块并将它们转换为单个 JSON 数组。转换器通过返回已转换记录的数量来表示它不需要更多记录 - 当不需要更多记录时,该数量将为零。下面是两段代码。第一个使用Flux.window,而后者使用Flux.buffer。后者的执行速度约为前者的十分之一,但下载的数据超出了绝对必要的数量。

Flux.window代码

    Flux<Flux<S3Object>> s3Objects = Flux.fromIterable(filteredList).window(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        Flux <File> downloadedFiles = innerS3Objects.publishOn(Schedulers.boundedElastic()).concatMap(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key()));
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();

Flux.buffer代码

    Flux<List<S3Object>> s3Objects = Flux.fromIterable(filteredList).buffer(windowSize);
    Integer cumulativeSize = s3Objects.concatMap(innerS3Objects -> {
        List <Mono<File>> toDownloadFiles = innerS3Objects.stream().map(s3Object -> downloadS3Object(BUCKET_NAME, s3Object.key())).collect(Collectors.toList());
        Flux <File> downloadedFiles = Flux.merge(toDownloadFiles);
        return downloadedFiles.publishOn(Schedulers.single()).concatMap(downloadedFile -> {
            return transformMockarooData(downloadedFile, generator, targetHourCount);
        }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j);               
    }).takeUntil(transformedRecordCount -> (transformedRecordCount == 0)).reduce((i,j) -> i + j).block();

-1
投票

我能够通过反应性分组实现它

flux.window(Duration.ofMillis(2000))
    .flatMap(window -> window.groupBy(Entry::getId)
        .flatMap(GroupedFlux::last)
        .collectList()
    )
    .subscribe(this::printList);
© www.soinside.com 2019 - 2024. All rights reserved.