我们正在尝试计算 Flux 上的不同事物。我们目前使用 AtomicIntegers 来进行计数,但是一旦 Flux 完成,我们如何才能获得这些值呢?我尝试了
.then
、.collectList().flatMap
和其他东西,但似乎我返回的 Mono 总是在 AtomicIntegers 仍为 0 时创建。如果我只是 subscribe()
在通量上然后返回一个新的 Mono,它会起作用,但是这样做没有意义。我什至尝试过 Mono.defer
但没有运气。
有没有办法让我想念?
不工作:
var successCount = new AtomicInteger();
var errorCount = new AtomicInteger();
var filteredRecords = new HashMap<String, List<String>>();
try (var reader = new CSVReader(new InputStreamReader(new ZippedS3ObjectStream(is)))) {
asyncS3InventoryStreamingService.validatedHeaderRecord(reader.readNextSilently(), fileName);
return Flux.fromIterable(reader)
.map(this::toRecord)
.filter(record -> filterRecords(record, filteredRecords))// add filtered records to a List
.map(this::processRecord)
.doOnNext(result -> successCount.getAndIncrement())// count successfully processed records
.onErrorContinue((ex, record) -> {
log.error(ERROR_RECORD, StructuredArguments.v("record", record), ex);
errorCount.getAndIncrement();
}).collectList().flatMap(resultList -> Mono.defer(() -> checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName)));
//}).then(checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName));// alternate approach
} catch (CsvValidationException | IOException exception) {
log.error(ERROR_LOG_MSG, StructuredArguments.value(FILE_NAME, fileName),
StructuredArguments.value(RECORD_COUNT, 0), exception);
return Mono.error(new ProcessingException(String.format(ERROR_PROCESSING_FAILED, fileName), exception));
}
只有这个有效:
}).subscribe();
return checkErrorsAndLog(successCount.get(), errorCount.get(), filteredRecords, fileName);
如果您只是创建一个
Flux
(或任何其他发布者),它不会自动开始处理数据,但您必须subscribe
,以便它开始使用数据。
但通常你只是返回一个发布者(即你在这里建立的 Flux/Mono)。直到最后一步,当您拥有产生程序最终结果的最终流程时,您才需要
subscribe
。
处理不在您启动它时在同一个线程中运行(除非您专门为其配置)。因此,如果您执行
subscribe
事件,它会立即继续执行下一行代码,这会关闭 reader
而没有给足够的时间来使用它。
看来您将同步代码与异步代码混合在一起,并且它不会正常工作。如果您使用资源,即打开-阅读-关闭某些东西,您必须作为发布者的一部分进行操作。见
Flux.using(...)
方法。
如果您只想返回忽略数据本身的计数器,您可以在主
Mono
之后创建一个新的Flux
。
___ 赞:
.collectList() // I'm not sure you need it, because you're ignoring them
.then(Mono.fromCallable {
Tuples.of(successCount.get(), errorCount.get())
})