project-reactor 相关问题

Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。

Spring Boot 3 / reactor - 在 WebFilter 中访问跟踪

使用 spring-boot-starter-parent 3.0.6 和 spring-boot-starter-webflux 弹簧启动启动器安全 微米追踪 千分尺示踪桥otel 我也使用 Hooks.enableAutomaticContextPropa ...

回答 0 投票 0

缓冲流中的唯一值,直到它不再唯一 - 在 groupby 中使用缓存

我有一个卡夫卡话题的通量。所以我必须处理大量的无限热源。不过,为了简单起见,我将把它与整数通量结合起来。 我想把这个通量变成一个

回答 1 投票 0

Reactor Mono Zip 替代方案(代码质量改进)

我能以某种方式简化或改进这段代码吗?基本问题是我需要构建一个包含来自异步调用的多个值的 DTO。单声道和单声道 // getMovie 返回 Mono 返回

回答 0 投票 0

在正确的线程上执行 onErrorResume

我正在实施死信逻辑,但在使用 webClient 调用外部服务时发生异常后,无法确认记录 订单消费者 。收到() .concatMa...

回答 0 投票 0

在递归调用中解压 Mono?

我正在处理一个应该调用下游服务的服务,并且根据该下游服务的响应,将调用我的下一个递归。 这是参考的示例代码...

回答 0 投票 0

Flux.sample(with Duration) 和 Flux.sampleTimeout() 有什么区别?

此方法的 2 个重载: public final Flux sampleTimeout(Function> throttlerFactory) 和 公开决赛 Flux

回答 0 投票 0

Mono 返回的错误没有被 onErrorResume 捕获

我正在使用 RxJava 2 通过 micronaut 框架进行反应式编程。我试图了解以下代码的问题: 有趣的 getItemDetails( itemRequest:请求到 ): 单声道 我正在使用 RxJava 2 通过 micronaut 框架进行反应式编程。我试图理解以下代码的问题: fun getItemDetails( itemRequest: RequestTo ): Mono<List<ResponseTo>> { return itemRequest.itemIdList.toFlux() .flatMap { findItemStatus(it) } .flatMap { response -> Mono.justOrEmpty(ResponseTo(response)) .onErrorResume { logger.error("Error occurred while mapping entity to response: $response", it) Mono.empty() } } .collectList() } RequestTo:由一个String列表组成。 这个想法是迭代这个列表,从数据库中获取每个项目的详细信息(通过返回数据库实体的findItemStatus()函数调用处理)并将其映射到响应对象。 我想这样工作,如果 findItemStatus() flatMap 操作返回的任何实体在将其映射到 ResponseTo 时导致异常,我希望在最终列表中跳过该项目但继续处理通量中的剩余项目.我不想将 onErrorResume 块保留在第二个 flatMap 之外,因为如果通量中的任何中间实体在映射时导致错误并且通量中的其余项目将不会被映射,这将关闭流。 我不确定我的代码有什么问题,但是当 Mono.justOrEmpty(ResponseTo(response)) 中抛出异常时,它不会在 onErrorResume 中被捕获。 我把测试用例放在这里: def "test getItemDetails() when there is an error mapping DB response to Response TO for any item"() { def itemEntity1 = new DiscontinuedItemsEntity("52782904", null, null, UUID.randomUUID()) def itemEntity2 = new DiscontinuedItemsEntity("52778904", null, null, UUID.randomUUID()) def requestBody = new DiscontinuedItemsTcinRequest(["52782904", "52782904"]) when: def response = itemsStatusService.getItemDetails(requestBody).block() def responseList = response.itemsList then: 1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity1) 1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity2) responseList.size() == 1 responseList.get(0).itemId == itemEntity1.itemId } 我需要这个测试用例通过,但是异常被抛出并且没有被处理,因为它没有被 onErrorResume 捕获

回答 0 投票 0

如何理解Reactor中的Flux.sampleTimeout方法?

此方法的 2 个重载: public final Flux sampleTimeout(Function> throttlerFactory) 和 公开决赛 Flux

回答 0 投票 0

Project Reactor:retryWhen() 的超时未按预期工作

下面的代码使用project reactor来模拟一个简单的交易,我们首先使用newTrx()来启动一个trx,然后调用getTrxResult()来获取trx的结果。 请注意,我们将重试 doGet() s ...

回答 0 投票 0

Flux with repeatWhen 使用 onCancel 永远不会终止

我想用我的代码做什么 我想构建一个无限的 Flux 来检查条件(功能标志),然后根据条件是否为真继续处理请求。 T...

回答 1 投票 0

Reactor的FlatMap是异步的吗

我是反应式编程的新手,我正在通过 micronaut 框架和 kotlin 使用反应堆。我正在尝试了解响应式编程的优势以及我们如何使用 Map 和 F 实现它...

回答 1 投票 0

下面的代码块有什么区别

我是响应式编程的新手,想了解一些基础知识。 @测试 公共无效 testMonoThen() { Mono fromRunnable = Mono.fromRunnable(() -> log.info("

回答 1 投票 0

java中flux.subscribe如何实现wait/notify

我刚开始使用 java 进行函数式编程,遇到了一些困难。 我正在编写一种方法来建立与数据库的反应性会话并将通量对象返回给调用者。 来电...

回答 0 投票 0

如何用spring cloud gateway拦截websocket消息

我在处理 websocket 连接的聊天服务前面运行云网关服务。 我们有没有一些方便的方法来记录套接字客户端和聊天服务之间发生的交互...

回答 2 投票 0

如何将 Flux 转换为具有多个计数器的 Mono

我们正在尝试计算 Flux 上的不同事物。我们目前使用 AtomicIntegers 来进行计数,但是一旦 Flux 完成,我们如何才能获得这些值呢?我试过了。然后,.collectList().flatMap ...

回答 1 投票 0

反应器背压 - 阻塞直到压力释放

我正在尝试使用 Flux 来处理数据库中的记录并将它们传递给 Kinesis 流——这非常有效,但如果我的 Kinesis 客户得到“Ba ...

回答 1 投票 0

为什么 Webflux 'then' 运算符不执行?

public Mono saveAccount(Mono requestMono){ var reqMono = requestMono.map(AccountSaveRequest::convertEntity); 返回 reqMono.flatMap(req ->

回答 0 投票 0

网络客户端在响应为空时在 Mono 上重试

我想在带有 WebClient 的 WebFlux 中的 Mono 上实现条件重放。情况如下: 我从外部服务调用一个铭文 api,它进行处理然后保存 ...

回答 0 投票 0

带 Reactor 的非阻塞 ReentrantLock

我需要限制同时处理相同资源的客户端数量 所以我试图实现模拟 锁.锁(); 尝试 { 做工作 } 最后 { 锁定。解锁(); } 但在非...

回答 3 投票 0

通过共享运算符将可连接的 Flux 转换为 Hot

我有下面的程序,它首先创建一个区间通量,取 5 个元素并订阅它。 发布之后,我使用带有自动连接 2 的重播运算符将其转换为可连接的通量,然后转换...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.