如何将 Flux 转换为具有多个计数器的 Mono

问题描述 投票:0回答:1

我们正在尝试计算 Flux 上的不同事物。我们目前使用 AtomicIntegers 来进行计数,但是一旦 Flux 完成,我们如何才能获得这些值呢?我尝试了

.then
.collectList().flatMap
和其他东西,但似乎我返回的 Mono 总是在 AtomicIntegers 仍为 0 时创建。如果我只是
subscribe()
在通量上然后返回一个新的 Mono,它会起作用,但是这样做没有意义。我什至尝试过
Mono.defer
但没有运气。 有没有办法让我想念?

不工作:

var successCount = new AtomicInteger();
var errorCount = new AtomicInteger();
var filteredRecords = new HashMap<String, List<String>>();
try (var reader = new CSVReader(new InputStreamReader(new ZippedS3ObjectStream(is)))) {
  asyncS3InventoryStreamingService.validatedHeaderRecord(reader.readNextSilently(), fileName);
  return Flux.fromIterable(reader)
      .map(this::toRecord)
      .filter(record -> filterRecords(record, filteredRecords))// add filtered records to a List
      .map(this::processRecord)
      .doOnNext(result -> successCount.getAndIncrement())// count successfully processed records
      .onErrorContinue((ex, record) -> {
        log.error(ERROR_RECORD, StructuredArguments.v("record", record), ex);
        errorCount.getAndIncrement();
      }).collectList().flatMap(resultList -> Mono.defer(() -> checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName)));
      //}).then(checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName));// alternate approach
} catch (CsvValidationException | IOException exception) {
  log.error(ERROR_LOG_MSG, StructuredArguments.value(FILE_NAME, fileName),
      StructuredArguments.value(RECORD_COUNT, 0), exception);
  return Mono.error(new ProcessingException(String.format(ERROR_PROCESSING_FAILED, fileName), exception));
}

只有这个有效:

}).subscribe();
return checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName);
java project-reactor
1个回答
0
投票
  1. 如果您只是创建一个

    Flux
    (或任何其他发布者),它不会自动开始处理数据,但您必须
    subscribe
    ,以便它开始使用数据。

  2. 但通常你只是返回一个发布者(即你在这里建立的 Flux/Mono)。直到最后一步,当您拥有产生程序最终结果的最终流程时,您才需要

    subscribe

  3. 处理不在您启动它时在同一个线程中运行(除非您专门为其配置)。因此,如果您执行

    subscribe
    事件,它会立即继续执行下一行代码,这会关闭
    reader
    而没有给足够的时间来使用它。

  4. 看来您将同步代码与异步代码混合在一起,并且它不会正常工作。如果您使用资源,即打开-阅读-关闭某些东西,您必须作为发布者的一部分进行操作。见

    Flux.using(...)
    方法。

  5. 如果您只想返回忽略数据本身的计数器,您可以在主

    Mono
    之后创建一个新的
    Flux

___ 赞:

.collectList() // I'm not sure you need it, because you're ignoring them
.then(Mono.fromCallable {
     Tuples.of(successCount.get(), errorCount.get())
}) 
© www.soinside.com 2019 - 2024. All rights reserved.