project-reactor 相关问题

Reactor是一个基础库,用于构建JVM上的反应式快速数据应用程序。它提供了Java,Groovy和其他JVM语言的抽象,使构建事件和数据驱动的应用程序更容易。它也非常快。

根据之前的元素结果对通量元素执行操作

我有三个工作类型的课程,如下所示。 界面工作{ 布尔执行(); } 类 Service1 实现 Work{ @覆盖 布尔执行(){ //一些

回答 1 投票 0

用于多服务流媒体的 WebFlux 接收器

我正在尝试制作几个反应式微服务: 制作人之一: @RestController @RequiredArgsConstructor 公共类事件控制器{ 私人最终水槽。Many水槽; @PostM...

回答 1 投票 0

Reactor GroupedFlux - 等待完成

拥有像下面这样的异步发布者,Project Reactor 有没有一种方法可以等待整个流处理完成? 当然,无需为未知的持续时间添加睡眠...

回答 2 投票 0

Project Reactor Mono.就在 flatMap 中

尝试搜索一些有关此用法的信息,但尚未找到明确的答案。在阻塞操作上使用 flatMap(非反应式)。当某些操作如

回答 1 投票 0

为什么在 Flux 发射完元素后,Reactor 项目中的 Schedulers.newParallel() 没有停止运行?

我有一个原始的字符串 Flux,并在 main() 方法中运行此代码。 包com.example; 导入reactor.core.publisher.Flux; 导入reactor.core.scheduler.Schedulers; 导入reactor.util.

回答 1 投票 0

对于 Spring Boot Web 客户端,记录不可解析响应的首选机制是什么?

这是我的设置。我的大部分代码都通过 webflux 使用 .bodyToMono 。但是,如果发生解析错误,Webclient 没有很好的方法来获取导致问题的原始正文。在这个例子中,

回答 1 投票 0

如何在Spring Cloud Gateway中获取请求体并addHeader

我打算将一个项目从Zuul迁移到Spring Cloud Gateway。 我有一个“校验和代码”,但我不知道如何迁移它。 在 zuul 代码中,我获取了 url 参数和 json 正文,然后我执行

回答 3 投票 0

如何正确管理Reactor中的可关闭资源

我有一个http客户端和执行器,当所有工作完成后应该将其关闭。 我正在尝试按照此处针对 RxJava 1.x 描述的方式使用 Flux.using 方法: https://github.com/meddle0x53/learning-

回答 1 投票 0

使用ReactiveSecurityContextHolder设置后Spring WebFlux安全上下文为空

我正在使用 Spring Boot 和 Spring Security 开发反应式应用程序,特别是处理 Kafka 消费者中的身份验证。尽管使用

回答 1 投票 0

由于背压,下游消费者的 Kafka 主题分区暂停。最终只有一个分区被排空

在我的项目中,我使用 spring-kafka (v3.1.1) 和reactor-kafka (v1.3.22) 来消费来自特定主题的事件。我们称之为“主题a”。下游消费者处理每条消息...

回答 1 投票 0

Project Reactor 中 doOnNext 的“即发即忘”操作

我有一个 Flux 流。对于处理的每个元素,我希望触发一个异步/非阻塞操作。例如,从数据库更新返回 Mono 的方法。 我想要...

回答 2 投票 0

Reactor 不会抛出 NPE。它仍然是反应式流实现吗?

这是我在响应式流规范中找到的内容 调用 onSubscribe、onNext、onError 或 onComplete 必须正常返回,除非任何提供的参数为 null,在这种情况下它必须...

回答 1 投票 0

functionCatalog.lookup(“sendFluxToWeb|sendFluxToKafka”)的问题

我想要实现的目标: 以反应方式将字符串流发送到 webapi 并发送到 kafka。 显示一些代码: 导入 org.springframework.boot.SpringApplication; 导入 org.springframework...

回答 1 投票 0

项目 Reactor 堆栈中的条件行为

我需要创建一个从外部缓存获取数据的方法。如果找到数据,它会进行一些处理,否则它应该进行不同的处理。 该方法应该返回 Mono 单声道

回答 1 投票 0

WebFlux Flux.cache() 缓存发出的数据存储多长时间

上下文: 我有一个有限的 Flux,它是 Rest API 调用的来源,并且该通量位于对我的 Spring Boot 应用程序进行的主 REST API 调用的范围内,该应用程序返回 Mono 并响应。我愿意...

回答 1 投票 0

如何批量处理助焊剂并并行处理每个批次

我的数据库中有 5,000,000 个实体。我通过反应式驱动程序(r2dbc)连接到数据库。接下来,我想将其拆分为 100,000 个实体,将它们拆分为 1,000 个实体的捆绑包,然后

回答 1 投票 0

如果 Flux 为空,如何传递 Consumer/Runnable?

我有一个 Flux 元素,我想记录一条消息,以防它不包含任何元素。我尝试了这个,但它没有按预期工作。该异常没有被我的消费者“捕获”,而是

回答 1 投票 0

如何将 Mono 变成真正的异步(非反应式!)方法调用?

我有一个方法 @服务 公共类我的服务{ 公共 Mono processData() { ... // 非常长的反应操作 } } 在正常的程序流程中,我调用这个方法

回答 2 投票 0

关闭reactor netty HttpClient的正确方法

Reactor netty HttpClient 没有关闭或关闭句柄的方法。 HttpClient释放资源的正确方法是什么? 根据文档当我使用 HttpC 创建 HttpClient 时...

回答 1 投票 0

Project Reactor 中的默认调度程序是什么?

Project Reactor 中是否有“默认”调度程序?哪一个?我所说的“默认”是指当链没有调用 subscribeOn() 或publishOn() 时使用的默认值。

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.