spring-cloud-stream 相关问题

Spring Cloud Stream允许用户使用Spring Integration开发和运行消息传递微服务,并在本地或云中或甚至在Spring Cloud Data Flow上运行它们。只需添加@EnableBinding并将您的应用程序作为Spring Boot应用程序(单个应用程序上下文)运行。您只需要连接到总线的物理代理,如果类路径上有相关的总线实现,则这是自动的。

在 Spring Cloud Stream 中应用动态组值

如何动态为流绑定器提供组值,以在使用 Spring Cloud Stream 时实现灵活性。我怎样才能实现这个目标?

回答 1 投票 0

Spring Cloud Stream Rabbit Binder 动态组名称

使用Spring Cloud Stream时,我需要动态地将组值赋予绑定器,我该怎么做?

回答 1 投票 0

我们可以使用 PostgreSQL 而不是默认的 dynamo 数据库来进行检查点和锁定,以防使用 Binder 方法从 Kinesis 消费数据吗

我有一个 Spring Boot 应用程序,它使用 Amazon Kinesis 来使用数据并将其保存到 PostgreSQL。 由于我已经在我的应用程序中使用一个数据库(PostgreSQL),我想避免使用另一个数据库

回答 1 投票 0

Spring Cloud动态消费者创建,消息类型丢失

我正在为其动态创建云流和消费者,因此无法正确定义消费者,因为有关类型的信息丢失了。 在下面的示例中,使用 @Bean 注释定义的消费者...

回答 2 投票 0

Amazon Kinesis Stream 名称未按照提供的 application.yml 文件获取

我有一个使用 Amazon Kinesis 来消费数据的 Spring Boot 应用程序。 它使用了错误的流名称 我需要使用的实际名称是:“test-test.tst.v1” 但它使用:“

回答 1 投票 0

Spring Cloud Stream 多个 Kafka 集群配置

在我的项目中,我需要连接到两个不同的Kafka代理。 我的 application.yaml 看起来有点像这样: 春天: 云: 功能: 定义:orderCreatedListener;orderProcessedLi...

回答 1 投票 0

PublishConfirm 的生产者配置

我试图弄清楚在使用 Spring Cloud Stream 应用程序构建生产者时是否有办法配置 PublishConfirm 行为。我知道建立生产者有两种方法 -

回答 1 投票 0

Spring云流项目,Leyton版本忽略kafka DLQ配置

我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...

回答 1 投票 0

Spring 流项目,Leyton 版本忽略 kafka DLQ

我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...

回答 1 投票 0

Kafka Streams 主题保留问题:动态主题配置和未发布的数据

我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:

回答 1 投票 0

Spring Cloud azure kafka 与多个 eventhub 命名空间绑定

了解如何使用 eventhub kafka 配置 Spring Cloud 以在同一命名空间中发送和接收消息,如下所示 https://github.com/MicrosoftDocs/azure-dev-docs/blob/master/articles/java/spr...

回答 2 投票 0

Spring云流批处理生产者抛出`SerializationException`

我正在尝试使用文档中描述的批量生成器。 我创建了以下供应商: @豆 公共供应商>> fetch() { return () -> List.of...

回答 1 投票 0

如何处理 org.apache.kafka.common.errors.SslAuthenticationException: SSL 握手失败

我的 Kafka 客户端具有以下配置: 春天: 云: 配置: 启用:假 溪流: 卡夫卡: 活页夹: 经纪人:本地主机:9092 zkNodes:本地主机...

回答 2 投票 0

使用供应商为 RabbitMq 生成消息

我需要轮询消息并将其发布到 RabbitMQ。设置了100个store,每个store可以轮询5条以上消息并将其发布到RabbitMQ。每分钟进行一次轮询。下面是

回答 1 投票 0

调度程序没有频道“unknown.channel.name”的订阅者

我正在尝试使用 spring 绑定来运行发布者和订阅者。 确保目的地(交换)是在rabbit-mq本地创建的。 发布者(app.yml) 春天: 云: 溪流: ...

回答 1 投票 0

如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?

使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @豆 公共函数,KStream 使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @Bean public Function<KStream<String, Double>, KStream<String, Double>> sqrt() { return numbers -> numbers.mapValues(Math::sqrt); } @Bean public Consumer<KStream<String, Double>> log() { return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value)); } 其中 sqrt() 输出数字的平方根,然后用 log() 记录。 application.yaml因此看起来像这样: spring: cloud: stream: function: bindings: sqrt-in-0: numbers sqrt-out-0: sqrt-numbers log-in-0: sqrt-numbers kafka: streams: bindings: sqrt: consumer: application-id: sqrtApplicationId log: consumer: application-id: logApplicationId 启动应用程序时,出现以下错误: The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled. Action: Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true 当然,现在将 definition-overriding 设置为 true 并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException。 我该如何解决这个问题? 问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error 假设您有两个名为 numbers 和 sqrt-numbers 的 Kafka 主题,则以下配置应该有效。 spring: cloud: stream: bindings: sqrt-in-0: destination: numbers sqrt-out-0: destination: sqrt-numbers log-in-0: destination: sqrt-numbers kafka: streams: bindings: sqrt-in-0: consumer: application-id: sqrtApplicationId log-in-0: consumer: application-id: logApplicationId 您可以使用 spring.cloud.stream.function.bindings.. 覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0 更改为 input,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input 那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination(通过 spring.cloud.stream.bindings.input.destination)。 您遇到的特定异常是因为您试图重用已创建的绑定名称 - sqrt-numbers。

回答 1 投票 0

具有两个输入主题的函数路由似乎破坏了 KafkaBinderMetrics

根据 https://stackoverflow.com/a/66871632/22992363,我正在配置一个功能路由器,它接受来自两个 Kafka 主题的输入。这是我的配置的精简副本: 春天: 云:

回答 1 投票 0

Spring云流产生消息数组,一次消费一条消息

我有一个简单的案例,我需要使用 Spring Cloud Stream 3.2.4 版本来实现。 我有一个预定的消息生产者: 公共类 MySourceOfMessagesImpl { 私人随机随机...

回答 1 投票 0

Spring Cloud Stream不会针对指定配置自动反序列化kafka消息

我使用 spring-cloud-stream-binder-kafka 和 spring-cloud-stream 以功能方式配置 kafka 流。 我的云依赖项来自 我使用 spring-cloud-stream-binder-kafka 和 spring-cloud-stream 以功能方式配置 kafka 流。 我的云依赖项来自 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> 主要依赖项是 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency> 我的yml配置: kafka-conf: # server: localhost:9101 server: localhost:9092 group: test-streams consume-topic-report-details: report_details_topic report-details-dlq: report_details_dlq produce-topic-report-details: report_details_topic_redirect schema: http://localhost:8081 spring: application: name: ${kafka-conf.group}-streaming cloud: function: definition: reportDetails stream: bindings: reportDetails-in-0: contentType: application/*+avro destination: ${kafka-conf.consume-topic-report-details} group: ${kafka-conf.group}-streaming reportDetails-out-0: contentType: application/*+avro destination: ${kafka-conf.produce-topic-report-details} kafka: streams: binder: deserialization-exception-handler: sendToDlq configuration: commit.interval.ms: 100 default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde bindings: reportDetails-in-0: consumer: dlqName: ${kafka-conf.report-details-dlq} binder: brokers: ${kafka-conf.server} schemaRegistryClient: endpoint: ${kafka-conf.schema} 问题详情 使用io.confluent.kafka.serializers.KafkaAvroSerializer序列化的消费消息。 我希望我的服务自动对流中的每条消息使用 io.confluent.kafka.serializers.KafkaAvroDeserializer,但我的 yml 配置中似乎忽略了某些内容。 结果我的蒸汽失败了 @Bean Function<ReportDetails, ReportDetails> reportDetails() { return data -> { log.info("input reportDetails: {}", data); return data; }; } 有例外 Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app') at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958) 另一方面,我尝试直接反序列化它(它有效)。 @Bean Function<byte[], byte[]> filterAbsence() { return dto -> { SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5); try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) { AbsenceDto obj = deserializer.deserialize("report_details_topic", dto); log.info("received message: {}", obj); } log.info("received message: {}", dto); return dto; }; } 我怀疑我的 DQL 配置也不正确。 当我的消费者失败时,我希望消息将被重定向到 DLQ,但其中一个是空的。 问题: 如何修复我的 yml 配置以使反序列化自动工作? 如何修复我的 DQL 配置或获得确认(如果配置正确)? 您检查过应用程序日志吗? StreamsConfig 已在启动时登录,因此您可以验证您的配置是否已被拾取,看起来没有。 我不是 spring 人,也不是 yml 专家,但你为什么使用: default: key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde 这些配置被称为 default.key.serde 和 default.value.serde 所以不应该是 default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde 类似于你设置的commit.interal.ms?您可以验证它是否检测到提交间隔更改吗?

回答 1 投票 0

可以使用 Solace Spring Cloud Stream Binder 设置“立即确认”吗?

根据 https://solace.com/blog/inside-a-solace-message-using-header-properties, 我们可以为消息头设置“立即确认”等属性。 相关API可以在

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.