如何在Reactor中进行多线程文件处理

问题描述 投票:0回答:1

我正在尝试使用Reactor的Flux并行处理多个文件。主要工作量发生在对flatMap的调用中,然后对通量进行转换和过滤。

[每当我尝试订阅生成的Flux时,主线程都会在收到任何值之前退出。

Flux.fromStream(Files.list(Paths.get("directory"))
    .flatMap(path -> { 
        return Flux.create(sink -> {
            try (
                RandomAccessFile file = new RandomAccessFile(new File(path), "r");
                FileChannel fileChannel = file.getChannel()
            ) {
                // Process file into tokens
                sink.next(new Token(".."));
            } catch (IOException e) {
                sink.error(e);
            } finally {
                sink.complete();
            }
        }).subscribeOn(Schedulers.boundedElastic());
    })
    .map(token -> /* Transform tokens */)
    .filter(token -> /* Filter tokens*/)
    .subscribe(token -> /* Store tokens in list */)

我希望在列表中找到处理管道的输出,但是程序立即退出。首先,我想知道我是否正确使用了Flux类,其次,我将如何等待订阅调用完成?

java blocking project-reactor
1个回答
0
投票

我希望可以在列表中找到处理管道的输出,但是程序会立即退出。

您那里的代码设置在主线程上的反应链,然后在主线程上不执行其他任何操作。因此,主线程完成了其工作,并且由于boundedElastic()线程是守护程序线程,因此没有其他线程阻止程序退出,因此它退出了。

您可以通过一个简单得多的示例看到相同的行为:

Flux<Integer> f = Flux.just(1, 2, 3, 4, 5)
            .delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);

您当然可以调用newBoundedElastic("name", false)使其成为non-daemon支持的调度程序,但是随后您必须跟踪它并在完成后调用dispose,因此它实际上只是在解决问题(该程序将无限运行,直到您处置调度程序为止。)

快速的'n'肮脏解决方案只是阻塞Flux的最后一个元素,作为您程序的最后一行-因此,如果我们添加:

f.blockLast();

...然后程序在退出之前等待最后一个元素被发出,并且我们具有我们要遵循的行为。

对于一个简单的概念证明,这很好。但是,这在“生产”代码中并不理想。首先,“无阻塞”是响应式代码中的通用规则,因此,如果您有这样的阻塞调用,则很难确定是否是有意的。如果添加了其他链,并且还希望它们完成,则必须为每个链添加阻塞呼叫。这很麻烦,而且不是真正可持续的。

更好的解决方案是使用CountDownLatch

CountDownLatch cdl = new CountDownLatch(1);

Flux.just(1, 2, 3, 4, 5)
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> cdl.countDown())
        .subscribe(System.out::println);

cdl.await();

这具有不显式阻止的优点,并且能够一次处理多个发布者(如果您将初始值设置为大于1。)这也是我通常推荐的这种方法东西-因此,如果您想要最广泛接受的解决方案,那就是它。

但是,对于所有需要等待多个发布者(而不是一个发布者)的示例,我倾向于使用Phaser-它的工作方式类似于CountdownLatch,但可以动态地register()deregister()。这意味着您可以创建单个相位器,然后根据需要轻松地向其注册多个发布者,而无需更改初始值,例如:

Phaser phaser = new Phaser(1);

Flux.just(1, 2, 3, 4, 5)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

Flux.just(1, 2, 3, 4, 5, 6, 7, 8)
        .doOnSubscribe(s -> phaser.register())
        .delayElements(Duration.ofMillis(500))
        .doFinally(s -> phaser.arriveAndDeregister())
        .subscribe(System.out::println);

phaser.arriveAndAwaitAdvance();

((当然,如果需要,您也可以将onSubscribedoFinally逻辑包装在单独的方法中。)

© www.soinside.com 2019 - 2024. All rights reserved.