project-reactor 相关问题

Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。

使用项目反应器实现过滤器

我有一个来自 Mongo 的项目列表,我需要通过远程调用结果过滤它们,如下所示: repository.findByString(String string) // Flux .filter(a -> {

回答 1 投票 0

使用反应堆即发即忘

我的 Spring boot 应用程序中有如下方法。 公共 Flux 搜索(SearchRequest 请求){ Flux result = searchService.search(request);//返回 Flux ...

回答 3 投票 0

.subscribe(-) 与 .map(-).subscribe()

我正在阅读有关使用 Mono 和 Flux 上的转换运算符转换数据的项目反应器文档。不过我可以帮忙问一下这个问题.. 做逻辑有什么区别...

回答 1 投票 0

如何在Webflux中同步rest方法?

问题在于,同时调用updateUser时,会出现错误的余额更新。 2023-09-03T18:20:53.035+03:00 信息 12668 --- [actor-tcp-nio-1] com.example.testsync.UserCont...

回答 1 投票 0

使用 StepVerifier 进行简单服务时,Groovy Spock 测试失败

我正在尝试测试我的服务 ProductService,但它失败,表明断言是 MonoJust 而不是我期望的产品。我不知道为什么测试因该问题而失败。我

回答 1 投票 0

如果找到值则返回 Mono.empty() 但不执行其他步骤

这个问题很难用文字描述,所以如果标题不符合要求,抱歉。 我想通过 Project Reactor Flux 和 Mono 实现特定目标,这似乎...

回答 2 投票 0

使用带有 lambda 的反应式应用程序时出现问题

我对在 Java 中使用响应式还很陌生。下面是我面临的问题。 我开发了一个带有控制器、服务和存储层以及 R2DBC 驱动程序的 springboot-REST 应用程序。 ...

回答 1 投票 0

在组装阶段仅订阅发布者并保持不变是否安全?

我已经在 Gitter 中询问过,但看起来这些天不太活跃.. 我很好奇,在生产代码中使用此类构造是否安全: 私有 Mono someHandler() {

回答 1 投票 0

Java Spring WebFlux 与 RxJava

我开始学习 Java 响应式编程。整个反应范式对我来说是新的。 在我的学习过程中,我遇到过一些术语/库,例如 Spring WebFlux、projectreactor...

回答 4 投票 0

多部分文件 api 不支持的媒体类型

POSTMAN 总是给出 415 unsupported media type 错误。标头包含带有边界的 multipart/form-data,如下面的 CURL 调用所示。还尝试用 RequestBody 替换 RequestPart...

回答 2 投票 0

反应器迭代通量直到不为空

我有带有字符串值的反应器通量,我迭代每个值并检查数据库中是否存在给定值。 我想迭代每个值,直到在数据库中找到该值。 如果没有找到(之后...

回答 1 投票 0

intellij 不存在类型变量 R 的实例,因此 Flux<R> 符合 Publisher<? extends DataBuffer> 问题

我正在尝试使用带有reactor和spring的java将Mono转换为Mono。 我有一个可以编译并正常工作的代码,但它会生成 Intellij 错误。 该...

回答 1 投票 0

使用 webClient 的 HttpRequest 和 HttpResponse 的自定义日志格式

我需要以自定义格式记录 HttpRequest 正文和 HttpResponse 正文。 例如 “发送请求。标头:{},正文:{},uri:{}” 如何获取请求正文? 我试试这个: log.info("发送

回答 1 投票 0

Reactive Java:在运行时更改窗口大小

我每 1 秒就有一个 Flux 发射物品,重复。如何动态改变窗口大小? 公共静态无效主(字符串[] args)抛出InterruptedException { 持续时间间隔 =

回答 1 投票 0

Webflux Reactor如何合并Stream.map输出,可以是Mono和Flux成Flux

A 用于获取源。在下面的映射函数中,如果拥有源,则源是 Flux,否则源是 Mono。函数根据A的List、列表大小...

回答 1 投票 0

从 Flux 的某些元素创建 Mono

我正在尝试从 Flux 的元素创建 Mono。该单声道需要包含一定数量的元素。 我找到了一个解决方案,那就是使用 Mono.create 方法、takeUntil 和 onCancel

回答 1 投票 0

有没有办法使用 Mono 从 WebClient 调用缓存响应?

如果输入相同,有没有办法缓存调用的响应?例如,如果文件名相同,我不想再次使用我的 webClient 发出请求,因为它需要......

回答 1 投票 0

如何仅从反应流中发出累积和?

我有一个用例,其中流应该仅在累积“总和”等于或超过给定值 n 时发出。让我们以 n = 5 的六个整数为例。 +---+------+---------+ |...

回答 3 投票 0

如果满足异步条件,如何映射通量停止?

考虑我有大量整数,并且此方法模拟异步外部 api 数据检索,它可以为某些特定的未知输入返回空响应: 公共静态单声道 考虑到我有大量的整数,并且此方法模拟异步外部 api 数据检索,它可以为某些特定的未知输入返回空响应: public static Mono<String> getApiData(int i) { if (i == 3) return Mono.empty(); // i'm using 3 just as an example return Mono.just(String.valueOf(i * 2)); } 这些方法将根据结果执行 getApiData 输出: // when getApiData returns non empty mono public static Mono<Boolean> updateDatabaseWithApiData(int apiInput, String apiOutput) { System.out.println(apiInput + " -> " + apiOutput); // lots of unrelated logic return Mono.just(true); } // when getApiData returns empty mono public static Mono<Boolean> logFailure(int apiInput) { System.out.println(apiInput + " -> failure"); // registering errors logs return Mono.just(false); } 用这个我想编写一个像这样的方法Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux),它对每个元素应用getApiData,并且在发生故障时停止。那么如果至少有一个元素达到updateDatabaseWithApiData,则返回Mono.just(true),否则返回Mono.just(false)。 所以我会得到这个输出: public static void main(String[] args) { Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5); processFluxUntilFailure(flux).subscribe(value -> System.out.println("result " + value)); } 所需输出: 1 -> 2 2 -> 4 3 -> failure result true 因为我们已经处理了(至少 1)2 个成功的元素。 考虑到: 这是我真正问题的简化版本 我无法预测数据何时会为空 getApiData 我无法改变所描述的方法,只能processFluxUntilFailure 我试过这个: public static Mono<Boolean> processFluxUntilFailure(Flux<Integer> flux) { return flux.flatMap(apiInput -> getApiData(apiInput) .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput)) .switchIfEmpty(Mono.defer(() -> logFailure(apiInput))) ) .reduce((b1, b2) -> b1 || b2); } 这导致了 1 -> 2 2 -> 4 3 -> failure 4 -> 8 5 -> 10 result true 我怎样才能从这次尝试中获得我想要的输出?换句话说,如果满足某些异步条件,我如何“停止”flatMap? 如果有任何不那么冗长的方法,我很乐意: public static Mono<Boolean> processAndUpdate(Flux<Integer> flux) { return flux .flatMap(apiInput -> getApiData(apiInput) .flatMap(apiOutput -> updateDatabaseWithApiData(apiInput, apiOutput)) .switchIfEmpty(Mono.defer(() -> logFailure(apiInput))) ).<Boolean>handle((b, sink) -> { if (b) sink.next(true); else sink.complete(); }) .defaultIfEmpty(false) .reduce((b1, b2) -> b1 || b2); } 给我这个输出: 1 -> 2 2 -> 4 3 -> failure result true 显然 flatMap 和 handle 并没有急于评估,将它们放在一起就成功了

回答 1 投票 0

使用 Flux.groupBy 排序数据,将 take(Duration.ofMillis) 替换为例如直到

考虑到下面的代码我使用 Flux groupBy 和 .take(Duration.ofMillis(10)),我每秒可以处理大约 50K 记录。在使用 Flux.just 在 localhost 上进行测试时,我可以将延迟值设置为 1 m...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.