如何批量处理助焊剂并并行处理每个批次

问题描述 投票:0回答:1

我的数据库中有 5,000,000 个实体。我通过反应式驱动程序(r2dbc)连接到数据库。接下来,我想将其拆分为 100,000 个实体,将它们拆分为 1,000 个实体的捆绑包,并将每个捆绑包进一步发送。当 100,000 个处理完一部分后,再获取 100,000 个。如果超过10万就OOM了

BIG_DATA -get 100.000-> make Batch -1.000(并行)-> 处理。如果成功-再获得更多

但是当我尝试模拟它时,所有原始数据都会被提取,然后才被分成多个包

public static void main(String[] args) throws InterruptedException {
        Flux.range(0, 12)
                .limitRate(6)
                .doOnNext(in -> log.info("NEW"))
                .parallel(2)
                .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(2)))
                .collect(() -> new ArrayList<Integer>(), List::add)
                .flatMap(DataMigrationServiceApplication::check)
                .subscribe();

        Thread.sleep(10000);
    }

    private static Mono<List<Integer>> check(List<Integer> input) {
        return Mono.defer(() -> Mono.just(input))
                .doOnNext(in -> log.info("IN {}", in))
                .subscribeOn(schedulers.parallel());
    }

日志

16:53:57.871 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO .DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO  DataMigrationServiceApplication -- NEW
16:53:57.881 [pool-1-thread-2] INFO DataMigrationServiceApplication -- IN [1, 3, 5, 7, 9, 11]
16:53:57.881 [pool-1-thread-1] INFO DataMigrationServiceApplication -- IN [0, 2, 4, 6, 8, 10]

我尝试 limitRate、buffer、window

请帮忙

java spring-boot project-reactor r2dbc
1个回答
0
投票

好吧,我做这个:

        generator
            .window(20)
            .flatMap(inFlux -> inFlux.parallel(4).runOn(/*scheduler*/).collect(() -> new ArrayList<Integer>(), List::add).flatMap(/*process*/))
            .subscribe();

它使 Flux 具有 20 个元素,然后 4-rails(并行)以循环方式获取元素。每个轨道采用 5 个元素并对其进行处理。

© www.soinside.com 2019 - 2024. All rights reserved.