Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
我使用 Spring Boot 2.7 与 Kafka 集成来消费记录。 这是我的听众,非常简单: @KafkaListener(主题 = {"journal-topic1"}) 公共无效onMessage(列表<
我在thransaction中选择更新,我按限制从数据库项目列表中选择(一项选择10个项目),然后必须将它们全部发送到kafka并在s内从数据库中删除所有它们...
我有一个 Spring Boot 应用程序,它有一个带有 @KafkaListener 的简单 Consumer。我有阻止重试逻辑,可以按预期工作,但如果出现反序列化异常,我想存储...
在 Spring Boot 中使用自定义容器工厂测试 Kafka 监听器
我在尝试测试卡夫卡监听器时遇到了很大的麻烦。 该类如下: @成分 类 KafkaListener( 私有 val 用例:UseCase ){ @KafkaListener( 主题 = [...
将 ConsumerRebalanceListener 添加到 ConcurrentKafkaListenerContainerFactory
在 Spring Boot 应用程序中,我使用一个用 @KafkaListener 注释的类作为消息监听器。我想向我的应用程序添加 ConsumerRebalanceLister 以管理重新平衡时的缓存数据。 ...
使用 Spring-Kafka 3.0.10 重试TopicNamesProviderFactory
我使用 Spring-Kafa 2.7 为我的 kafka 应用程序实现了非阻塞重试。我需要 DLT 的自定义主题名称,因此使用了 RetryTopicNamesProviderFactory。重试过程按预期进行...
从 Kafka 生成的消息中删除 x_datadog_kafka_ Produced 标头
我们正在使用 spring-kafka 库将消息发布到 kafka 主题。我们还有 datadog 来监控 kafka 队列。 发布消息时,x_datadog_kafka_ Produced标头被注入...
Spring Boot Kafka:如果我重新启动消费者,如何防止消费相同的消息?
我启动了kafka,并在主题用户中生成了1000条消息 我创建了一个 Spring Boot Consumer 来侦听该主题并将消息保存在数据库中 如果我关闭消费者应用程序会怎样...
KafkaStream 在代理滚动升级时达到 ERROR 状态
我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...
嗨,升级到 Spring Boot 3 后,我遇到了 swagger ui 的问题。swagger-ui 不再工作,我得到了 404 和“白标签”页面作为响应。 近距离观察后...
使用kafkaListener和syncCommit,但它们不起作用
我有一个应用程序,它侦听 Kafka 主题并在 1 个线程中从中读取 1 条消息。读完一条消息后,有一定的逻辑,期间有两个选择——一切都是
Spring Kafka Consumer Client-Id 配置
我有两个 Kafka Listener 组件,每个组件监听不同的主题并期望不同的负载。 我的问题是,我可以为两者使用相同的客户端 ID 还是必须是
KafkaException:类不是 org.apache.kafka.common.serialization.Deserializer 的实例
我想实现发送和接收Java序列化对象的Kafka生产者。我试过这个: 制作人: @配置 公共类 KafkaProducerConfig { @Value(值 = "${kafka.
Kafka - 使用多个主题时出现 InvalidTopicException
我有一个 SpringBoot 应用程序,其中使用带有主题和 groupId 的 @KafkaListener 注释。我的听众需要听两个主题。当听一个主题时,任一
使用 Spring Boot Starter 父级:3.1.3。 我正在尝试实施阻塞重试。这是简单的参考实现。但它仍然不起作用。任何建议请。 @成分 公共...
在将记录发送到 kafka-stream 中的主题时设置标头
我需要处理kafka-stream中的请求记录,并且还需要告诉发送者处理的响应。我创建了一个 Rest 端点来接收请求。该端点使用 spring-kafka 的
如何在 Spring Kafka 配置中设置 ssl 文件的位置?
我正在尝试在 Spring 客户端和 Kafka 之间建立 ssl(mTLS) 连接。 我得到了两个文件: -app.keystore; -client.truststore。 当我使用 /usr/bin/keytool -list -rfc 检查文件时 -
EmbeddedKafka w/ContainerTestUtils.waitForAssignment 抛出:预期为 1,但获得了 0 个分区
我们有一个集成测试,我们使用 EmbeddedKafka 并向某个主题生成一条消息,我们的应用程序处理该消息,然后将结果发送到第二个主题,我们在其中使用并断言输出...
每当我调用此函数时,我都想要特定 id 的旧 kafka 事件。我应该使用 Kafka Stream 还是 kafkaListner
当旧事件数量较少或较多时,这是首选。我正在使用弹簧靴
对于反应式 Kafka 中的一个生产者和多个消费者来说,是否可以拥有恰好一次语义?
我想知道是否可以通过分布式消费者以反应方式实现一次语义。 在 Reactor Kafka 的参考文档中,我们有一个 Exactly-once 交付的示例代码。 ...