我有一系列网络客户端调用,如下所示。
Flux<RecordA>
List<Long>
)Flux<RecordA>
对象值。那么如何将通量从步骤 1 传递到步骤 4?
通量是数据记录流。在每条记录上,您描述一个处理步骤列表,以计算上一步的新状态/记录。
如果可能,我建议您使用新状态复制记录,而不是尝试更新现有实例。
举个例子:
record RecordA(Long id, List<Long> tagIds, List<String> tagDescriptions)
public static Mono<String> fetchTagDescription(Long tagId) {
if (tagId == null) return Mono.error(new IllegalArgumentException("Null ids not accepted"));
// Mimic external API call
return Mono.just("Description for tag "+tagId);
}
Flux<RecordA
时,您可以获取每条记录的标签描述,然后创建包含标签描述的最新副本,如下所示:public static Flux<RecordA> updateTagDescription(Flux<RecordA> records) {
return records.flatMap(record -> {
if (record.tagIds() == null || record.tagIds().isEmpty()) {
return Mono.just(record);
}
return Flux
.fromIterable(record.tagIds())
.concatMap(tagId -> fetchTagDescription(tagId))
.collectList()
.map(descriptions -> new RecordA(record.id(), record.tagIds(), descriptions));
});
}