spring-cloud-stream 相关问题

Spring Cloud Stream允许用户使用Spring Integration开发和运行消息传递微服务,并在本地或云中或甚至在Spring Cloud Data Flow上运行它们。只需添加@EnableBinding并将您的应用程序作为Spring Boot应用程序(单个应用程序上下文)运行。您只需要连接到总线的物理代理,如果类路径上有相关的总线实现,则这是自动的。

Spring云流和异步流程

我有以下简单的消费者应用程序。我想知道如何使消费者仅在当前异步进程完成后才拉取下一条消息 @SpringBootApplication 公开课应用程序{...

回答 0 投票 0

EmitterProcessor 已弃用,我们如何替换它?

在提出问题之前,我浏览了很多链接,例如:Spring WebFlux (Flux): 如何动态发布,但还无法解决。 我有下面的代码,但是类型 EmitterProcessor 在提出问题之前,我浏览了很多链接,例如:Spring WebFlux (Flux): 如何动态发布,但还无法解决。 我有下面的代码,但是 EmitterProcessor> 类型已弃用,我如何将此代码转换为 Sinks.many().multicast().onBackpressureBuffer()? 配置.java @Configuration public class DynamicDestinationConfig { @Bean public EmitterProcessor<Message<?>> emitterProcessor(){ return EmitterProcessor.create(); } @Bean public Supplier<Flux<Message<?>>> supplier() { return () -> emitterProcessor(); } } 控制器.java @RestController public class DynamicDestinationController { @Autowired private EmitterProcessor<Message<?>> emitterProcessor; @Autowired private ObjectMapper jsonMapper; @SuppressWarnings("unchecked") @PostMapping("/") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception { Map<String, String> payload = jsonMapper.readValue(body, Map.class); String destinationName = payload.get("id"); Message<?> message = MessageBuilder.withPayload(payload) .setHeader("spring.cloud.stream.sendto.destination", destinationName) .build(); emitterProcessor.onNext(message); } }

回答 0 投票 0

消息<String>和消息<byte[]>

我正在使用Spring Cloud Stream功能模型 如果有人可以帮助了解哪个性能更好 功能 或者 功能 或者 功能 我正在使用 Spring Cloud Stream 功能模型 如果有人可以帮助了解哪个性能更好 功能 或者 功能 或者 功能、消息>

回答 0 投票 0

消费者函数和服务函数访问同一个kafka流的状态存储是线程安全的吗?

我正在使用 kotlin + Spring Boot + Kafka Streams 和 Spring Cloud Stream。 我有一个服务功能,可以验证客户端的请求并将其发送到 kafka 主题。 要验证请求,需要...

回答 0 投票 0

Spring Cloud Stream:Spring Boot 3.x:JsonProperty、JsonIgnoreProperties 在消息转换器中与 Jackson 的行为异常

我目前正在开发 Spring Boot 应用程序(版本 3.0.6)并使用 Spring Cloud(版本 2022.0.2)。我有两个不同的端点(“/mvc”和“/message”),它们使用两个

回答 1 投票 0

Spring Cloud Stream 嵌入式kafka

春季队, 我对 3.0.5 版本的 spring cloud stream 进行了非常简单的测试。它曾经工作过。 @嵌入式卡夫卡 @SpringBootTest 公共课 KafkaTest { @Autowired 私人

回答 0 投票 0

带有 Spring Cloud Contract 和测试容器的 Spring Cloud Stream kafka

我正在使用 Spring Cloud Stream 和 Spring Cloud contract 进行集成测试,我正在使用 TestChannelBinderConfiguration 并且它正在运行。现在我已经在我的消费中添加了 TestsContainer Kafka ...

回答 1 投票 0

Spring 云数据流一个处理器将海关数据发送到多个接收器

在 Spring SCDF 中,我有一个只有一个输入的处理器,我想根据数据类型将数据发送到 3 个不同的接收器,例如:整数到 sink1,字符串到 Sink3 和 Longs 到 SInk3,每个...

回答 1 投票 0

如何在 Spring Cloud Stream 中获取关联 ID

春季队, 下面的生产者能够成功地将值发送到 kafka 主题。 @豆 供应商> someProducer(){ 返回 () -> Flux.range(1, 10); } 但是..ho...

回答 1 投票 0

Spring Cloud Stream Test Binder 不填充主题标题

当从 Spring Cloud Stream 测试活页夹中使用消息时,不会填充以下标头: kafka_receivedTopic 但是当我连接到一个实际的 kafka 代理时,它就会被填充。 有没有...

回答 1 投票 0

为什么用@StreamListener 注释的 kafka 侦听器表现不同于 kafka 侦听器实现消费者接口?

我们正在将 spring boot 2.2.6.release 升级到 2.7.8,并且在 Kafka 侦听器上面临以下问题。 场景-1: 以前我们使用 SCS 3.0.4.Release 并使用@StreamListener 消费 Kafka 乱七八糟的......

回答 0 投票 0

如何拦截Spring Cloud Function消息

我正在从 Spring Cloud Streams 和已弃用的 StreamListeners 迁移到 Spring Cloud Function 及其消费者。我在我的应用程序中有一个拦截器来配置一些日志元数据和用户

回答 0 投票 0

Spring StreamBridge 在处理流桥上的批量消息时内存不足

我正在使用流桥通过 pubsub 发送消息 我有一些 100k 消息需要推送到 pubsub。我正在使用 5 个线程的执行程序池来完成这项工作 private void fetchAndPublish(列表<

回答 0 投票 0

Spring Cloud Dataflow 自定义流应用程序指标未显示在 Grafana 中

SCDF 版本 2.10.2 我无法让我的自定义源、处理器或接收器向我的 prometheus/grafana 设置公开指标。 我在这里遵循了自定义流应用程序教程:https://dataflow.sp...

回答 0 投票 0

spring-cloud-stream-binder 是否已经支持使用 Kinesis enhanced fan-out?

我找不到任何说明如何与 Kinesis 增强型扇出集成的文档。 还是,有这种可能吗? 我的配置示例: 春天: 云: 流:

回答 0 投票 0

docker-compose 在挂载卷时找不到证书文件

我在 docker 容器中运行一个 spring cloud stream 应用程序,需要挂载证书以对远程端点进行身份验证,但我不断收到 java.nio.file.NoSuchFileException w...

回答 0 投票 0

如何在高流量问题下让Spring Cloud Stream Kafka Function更可靠?

我不明白,为什么我总是在我的 Spring Cloud Stream Kafka 拓扑中收到 TimeoutException。 我的 Spring Cloud Stream 函数有以下配置: 春天: 应用: ...

回答 0 投票 0

使用 Spring Cloud Stream 3.0.9.RELEASE

我想在 Sprint Cloud Stream 3.0.9.RELEASE 中添加一个 bean 到自定义的 ConsumerInterceptor 中,因为 ConsumerConfigCustomizer 被添加了。但是,注入的 bean 始终为 NULL。 Foo(取决于...

回答 2 投票 0

Json 模式注册表 spring cloud kafka

我使用 spring cloud stream kafka,我对 json 模式注册表有疑问,我用 confluent 的 api 保存我的模式,但我总是遇到这个问题 org.springframework.messaging.

回答 1 投票 0

spring cloud函数定义的多个yaml文件

我们有大约 20 种不同的消息处理器,即 spring 云函数定义和 kafka 绑定。 因此分为 10 个不同的 yaml 文件。 曾尝试使用 spring.con 配置 yaml 文件...

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.