Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
如何使用具有 2 个具有不同组 ID 的消费者的单个应用程序实例加入 kafka 主题
我有一个场景,我需要在同一个应用程序中创建 2 个消费者,具有 2 个不同的组 ID,并且都使用来自同一主题的消息。实现它的理想方法是什么? 我试过了……
如果不可用,如何配置 Kafka 生产者停止重试连接到代理? 我添加了重试配置,但它不起作用,它仍然无限期地重试...... 我想滑雪...
Kafka 生产者在 org.apache.kafka.clients.producer.RoundRobinPartitioner 中的 0、2、4 等替代分区中产生价值
Kafka 生产者在 org.apache.kafka.clients.producer.RoundRobinPartitioner 中的 0,2,4...等替代偶数分区中产生价值 我有 6 个分区,它们是 6 个并发消费者
Spring Kafka 获得无限重试尝试 版本:Spring-Kafka-2.2.4.RELEASE 听众设置 @豆 public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFa...
教程网站上的一个非常简单的 Kafka 消费者应用程序:https://www.baeldung.com/spring-kafka 但是一旦用 openJDK 17 容器化,这个问题就 100% 可重现: 错误 1 --- [ntainer#0-0-C-...
Spring Example HTTP Polling with Apache Kafka Request/Reply
有人问我这个问题,我想如果存在类似的要求,我会把它贴在这里帮助社区。 释义: 我想轮询一个 HTTP 端点以获取请求/回复
使用 JsonSerializer kafka 相对于手动解析 String 的优势
在 kafka 消费者中使用 JsonDeserializer 与在应用程序内部使用 Jackson 手动解析接收到的 JSON 字符串相比有什么优点或缺点吗? 会不会有什么小鬼...
Spring-boot Kafka header byte[]格式问题
我正在将我的 spring-boot-starter-parent 版本从 2.1.9.RELEASE 升级到 2.6.14。注意到所有 Kafka 消息头值都被转换为 byte[] 格式而不是 String 值。这是影响...
编辑 Kafka Listener Spring App 以更改 Stage/Target
我可以利用另一个运行 Kafka 应用程序/代码库的团队来使用相同的数据/将其加载到我们的新暂存表中,而不是他们的。他们有很多不同的 kafka listener adapter .j...
在 Spring Kafka ErrorHandler 中将 RetryableException 视为 RetryableException 之后的 NotRetryableException?
我正在实现一个带有基本恢复器(只是一个日志)的 DefaultErrorHandler 和一个无限的 ExponentialBackOff 当数据库不在这里时: ExponentialBackOff exponentialBackOff = 新的 ExponentialBack...
Kafka Listener Adapter .Java 文件的更改阶段/目标
我可以利用另一个运行 Kafka 应用程序/代码库的团队来使用相同的数据/将其加载到我们的新暂存表中,而不是他们的。他们有很多不同的 kafka listener adapter .j...
我需要了解 Kafka 独家消费者重试行为以进行固定退避。 独占消费者正在从具有 6 个分区的主题中读取数据。所有 6 个分区都有数据。 固定B...
用于 spring boot 微服务的 kafka docker 的 ACL 设置
我是 confluent 的新手,所以真的不知道如何正确实施 acl 授权。我想使用 apache kafka 作为我的微服务之间的消息传递器,我已经成功实现了它。现在我...
我在我的项目中使用 spring kafka 并致力于添加重试功能。 作为代码工作流程的一部分,一条消息被消费,我们对某个端点进行休息调用。如果失败我需要...
Spring Kafka - 主题的动态注册/取消注册 - 导致抵消问题
我们正在尝试利用动态 Kafka 主题监听(基于方法 https://www.geeksforgeeks.org/spring-boot-start-stop-a-kafka-listener-dynamically/)来处理 Kafka 的灾难恢复
我需要向特定的 Kafka 主题发送消息。 我使用以下 KafkaTemplate 来执行此操作: Kafka模板 在 Kafka 生产者中放入以下参数: 初...
如何在运行时创建 KafkaConsumer? (在 Spring Boot 中)
我正在开发一个已经使用和创建 Kafka 消息的应用程序。使用 @Component 创建在启动时初始化的侦听器,并使用发送到其
这是我正在为其编写单元测试的制作人发送方法: public void send(final ReadingForecastRequest requestStub) { logger.info("{}::send()", getClass().getCanonicalNam...
org.apache.kafka.common.errors.TimeoutException:“topicna”的 1 条记录过期 -4:自批创建以来已经过去了 96866 毫秒加上逗留时间
org.apache.kafka.common.errors.TimeoutException:“topic_namexxxx”的 1 条记录过期 -4:自批创建以来已经过去了 96866 毫秒加上滞留时间 生产者发布消息失败...
我正在写一个kafka消费者。我已经将Acknowledged属性设置为手动。因此,每当消费者未能处理消息时,我都不会确认。现在我想让消费者处理这个失败的消息