spring-kafka 相关问题

Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。

Spring-kafka默认用于微米观察

据我了解,对于spring-kafka,默认情况下禁用使用Micrometer进行观察。 我们需要在 KafkaTemplate 和 ContainerProperties 上将 ObservaEnabled 设置为 true 以启用观察...

回答 1 投票 0

Spring Kafka 提交监听器之后

我正在尝试编写抽象代码,该代码应该跟踪偏移量变化并在消息处理后执行一些操作。问题是我不想过多更改现有侦听器的代码。 找到了...

回答 1 投票 0

Spring Kafka 与数据库/MQ 交互的事务管理

我们正在尝试在 Spring Kafka 消费者中实现事务管理。 我们让 Kafka 消费者监听来自主题 A 的消息 -> 数据库更新/插入 -> 产生 Kafka 混乱...

回答 1 投票 0

@EmbeddedKafka 不适用于 Spring-Boot 3.2* 或 Spring 6.*

我们已经将spring-boot版本升级到3.2.1。发布嵌入式 Kafka(@EmbeddedKafka) 无法启动。 Java版本:17.0.8 MVN版本:3.5.8 子模块 POM 4.0.0&...

回答 1 投票 0

Kafka Compacted Topic:定期重置消费者偏移量

我想将实体信息存储在 Apache Kafka 压缩主题中。因此,某些具有相同键的值可能会被更新。 假设生产者发送一条带有消费者拥有的密钥的消息

回答 1 投票 0

测试用例记录将 spring-kafka 升级到 3.0.10 后,kafka 客户端连接问题数量增加

我们有一个Spring测试,我们创建一个Kafka模板的@MockBean,以便上下文在启动过程中不会初始化失败。这只是一个基类,其中没有任何测试

回答 1 投票 0

无法使用 Avro(自定义 Serdes)序列化 Spring Cloud Streams

我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。下面是我的配置 @豆 公共双功能 我正在开发一个流处理应用程序,它连接两个流并将新记录输出到不同的主题。以下是我的配置 @Bean public BiFunction<KStream<String, TestRecord>, KStream<String, TestRecord>, KStream<String, ModifiedTestPayload>> process() { return (walletCreated, placementCreated) -> placementCreated .selectKey((key, val) -> val.getClientId()) .join(walletCreated.selectKey((s, testPayload) -> testPayload.getClientId()), (ts, ts1) -> new ModifiedTestPayload(ts.getClientId()), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.of(2, ChronoUnit.MINUTES)), StreamJoined.with(Serdes.String(), CustomSerdes.TestRecord(), CustomSerdes.TestRecord())); } 两个主题都将输入流以产生相同结构的数据。以下是我定义的自定义 Serdes。 public class CustomSerdes { public static Serde<TestRecord> TestRecord() { return new TestRecordSerde(); } public static class TestRecordSerde extends Serdes.WrapperSerde<TestRecord> { public TestRecordSerde() { super(new TestRecordSerializer(), new TestRecordDeserializer()); } } public static class TestRecordSerializer implements Serializer<TestRecord> { private final KafkaAvroSerializer inner; public TestRecordSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, TestRecord data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class TestRecordDeserializer implements Deserializer<TestRecord> { private final KafkaAvroDeserializer inner; public TestRecordDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public TestRecord deserialize(String topic, byte[] data) { return (TestRecord) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } public static Serde<ModifiedTestPayload> ModifiedTestPayload() { return new ModifiedTestPayloadSerde(); } public static class ModifiedTestPayloadSerde extends Serdes.WrapperSerde<ModifiedTestPayload> { public ModifiedTestPayloadSerde() { super(new ModifiedTestPayloadSerializer(), new ModifiedTestPayloadDeserializer()); } } public static class ModifiedTestPayloadSerializer implements Serializer<ModifiedTestPayload> { private final KafkaAvroSerializer inner; public ModifiedTestPayloadSerializer() { this.inner = new KafkaAvroSerializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public byte[] serialize(String topic, ModifiedTestPayload data) { return inner.serialize(topic, data); } @Override public void close() { inner.close(); } } public static class ModifiedTestPayloadDeserializer implements Deserializer<ModifiedTestPayload> { private final KafkaAvroDeserializer inner; public ModifiedTestPayloadDeserializer() { this.inner = new KafkaAvroDeserializer(); } @Override public void configure(Map<String, ?> configs, boolean isKey) { inner.configure(configs, isKey); } @Override public ModifiedTestPayload deserialize(String topic, byte[] data) { return (ModifiedTestPayload) inner.deserialize(topic, data); } @Override public void close() { inner.close(); } } } 和我的 application.yml 下面 spring: kafka: streams: application-id: test-app cloud: function: definition: process stream: kafka: streams: bindings: process-in-0: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-in-1: consumer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde process-out-0: producer: key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde value-serde: ai.wownettests.kf.kafkastreamtestone.CustomSerdes$ModifiedTestPayloadSerde configuration: schema.registry.url: http://localhost:8081 binder: brokers: localhost:9092 configuration: schema.registry.url: http://localhost:8081 specific.avro.reader: true default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde bindings: process-in-0: destination: testkf.placements.created process-in-1: destination: testkf.wallet.created process-out-0: destination: testkf.client.profile_completed logging: level: root: info 当我启动我的应用程序时,我收到如下错误(我有一些关于该应用程序尝试处理的 kafka 主题的数据) org.apache.kafka.streams.errors.StreamsException: Unable to serialize record. ProducerRecord(topic=[test-app-KSTREAM-KEY-SELECT-0000000003-repartition], partition=[null], timestamp=[1706526131230] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:231) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:48) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) ~[kafka-streams-3.6.1.jar:na] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) ~[kafka-streams-3.6.1.jar:na] Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: You must configure() before serialize() or use serializer constructor with SchemaRegistryClient 通过创建自定义 serde 的 bean 修复了这个问题 @Bean public Serde<TestRecord> testRecordSerde() { Map<String, ?> config = Collections.singletonMap("schema.registry.url", "http://localhost:8081"); var r = CustomSerdes.TestRecord(); r.configure(config, false); return r; }

回答 1 投票 0

使用 Kafka 进行模拟单元测试 - 回调模拟生产者

我想围绕调用 Kafka 设置一些测试。我的 Kafka 调用有以下回调设置,请参见下文: @Async // 允许函数异步。还必须设置配置 @欧维...

回答 1 投票 0

Kafka 问题:MessageConversionException:无法将 GenericMessage 从 [java.lang.String] 转换为 [my_custom_model] [...]

我有 Kafka 生产者和消费者服务器,当我尝试发送消息时,出现以下异常: org.springframework.kafka.listener.ListenerExecutionFailedException:监听器方法无法

回答 2 投票 0

@EmbadedKafka 不适用于 Spring-Boot 3.2* 或 Spring 6.*

我们已经将spring-boot版本升级到3.2.1。发布嵌入式 Kafka(@EmbadedKafka) 无法启动。 Java版本:17.0.8 MVN版本:3.5.8 子模块 POM 4.0.0 我们已将 spring-boot 版本升级为 3.2.1。发布嵌入式 Kafka(@EmbadedKafka) 无法启动。 Java版本:17.0.8 MVN版本:3.5.8 子模块POM <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.fault</groupId> <artifactId>topology</artifactId> <version>1.0.0-SNAPSHOT</version> </parent> <name>cdc-service</name> <artifactId>cdc-service</artifactId> <packaging>jar</packaging> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <mainClass>com.cdc.MyServiceApplication</mainClass> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> 父模块 Pom <properties> <java.version>17</java.version> <revision>1.0.0-SNAPSHOT</revision> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- common --> <spring.boot.version>3.2.1</spring.boot.version> <jasypt.spring.version>3.0.5</jasypt.spring.version> </properties> 它抛出异常以下 java.lang.NoClassDefFoundError: org/apache/kafka/coordinator/group/assignor/RangeAssignor at kafka.server.Defaults$.<clinit>(KafkaConfig.scala:179) ~[kafka_2.13-3.6.0.jar:na] at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala:746) ~[kafka_2.13-3.6.0.jar:na] at kafka.server.KafkaConfig.DeleteTopicEnableProp(KafkaConfig.scala) 我们添加了所需的 jar (kafka-cordinator jar) 来解决此问题。但在那之后,它会抛出一些其他类未找到的错误。是否需要这种依赖性? 测试类级别注释 @SpringBootTest @DirtiesContext @EmbeddedKafka(partitions = 1, brokerProperties = {"auto.create.topics.enable=false"}, topics = {"topic1","topic2"}, ports = 9092) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestInstance(TestInstance.Lifecycle.PER_CLASS) 最新版本需要添加什么额外的东西吗? 从 Spring for Apache Kafka 开始的 @EmbeddedKafka 默认使用 KRaft 算法。 Kafka 客户端的 3.1 不支持显式端口。 或者不要使用那个 KafkaClusterTestKit 或使用 ports = 9092。在已关闭的问题中查看更多信息:https://github.com/spring-projects/spring-kafka/issues/2916

回答 1 投票 0

如何正确处理Spring Kafka Producer发送消息失败?

我有一个 Spring Kafka 应用程序,它接收 HTTP 请求并将其有效负载发送到 Kafka 主题。我想预见以下不成功的情况: 最初,应用程序可以运行

回答 1 投票 0

使用模拟消费者对 MessageListenerContainer 进行单元测试

我有一个带有 MessageListenerContainer bean 的 Spring Boot 应用程序,我想对其进行单元测试。如何使用模拟 Kafka 消费者来模拟接收多条记录?我已经搜索过了

回答 1 投票 0

可选逻辑类型情况下的 Avro 序列化问题

我没有任何kafka和相关技术的经验,所以我需要你的帮助。 我面临以下问题: 在我当前的项目中,我们使用融合平台(kafka + schema 注册表)。对于

回答 1 投票 0

我如何使用mockito-kotlin来检查ProducerFactory?

我在 Kotlin 上有一小段代码,其中配置了 Producer。 测试实现(kotlin(“测试”)) testImplementation("org.mockito.kotlin:mockito-kotlin:4.1.0") 财产...

回答 1 投票 0

Kafka 版本:3.0.1- 重复创建 kafka 管理客户端 - 内存泄漏

我们有 springboot 应用程序,它从单个主题消费并生成多个主题的记录。 最近将此应用程序升级到了 Sprinboot-2.6.7 以及 gradle 项目中相应的其他依赖项....

回答 2 投票 0

带有虚拟线程和@Async的Spring kafka

我查看了 spring-kafka 文档,但它没有提到使用 @KafkaListener 和 @Async 考虑虚拟线程。 例如,在 Spring Boot 的配置中我们设置: spring.threads.virtual.

回答 1 投票 0

在Spring Kafka中实现批量事件的Exactly-Once处理

我正在开发一个 Spring Kafka 应用程序,我在其中为主题 A(列表<'Sms>)生成批量事件。目标是在整个流程中实现一次性处理,包括消费...

回答 1 投票 0

Springboot Kafka 自动配置 - SASL_PLAINTEXT 的 SSL 捆绑包与 SCRAM-SHA-512

自SpringBoot 3.2版本起 公共地图 buildAdminProperties() 在版本 3.4 中已弃用并标记为删除 https://docs.spring.io/spring-boot/docs/current/api/org/

回答 1 投票 0

将值注入自定义 DeadLetterPublishingRecoverer

我有一个自定义的死信恢复器,我已经实现了它,以便我可以重写 createProducer 方法 受保护的 ProducerRecord createProducerRecord(ConsumerRecord<...

回答 2 投票 0

org.apache.kafka.clients.NetworkClient 引导代理 bootstrap-servers-ip:9092 已断开连接

我正在本地系统上运行apache kafka,它运行得非常好。但在冒烟测试期间,我的应用程序无法连接到 kafka 集群。它不断抛出以下内容...

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.