kafka-consumer-api 相关问题

用于与Apache Kafka使用者API相关的问题

kafkaconsumerfactory 与 kafkalistenercontainerfactory

Spring Kafka 中消费者和容器之间的关系/区别是什么? 我问这个问题是因为Spring提供了kafkaconsumerfactory和kafkalistenercontainerfactory,而我

回答 1 投票 0

java kafka消费者类如何处理分区?

我基本上有一个带有 Spring Boot 应用程序多个任务的系统,使用以相同消费者组 id 运行的 Kafka 消费者类。它们都与同一主题相关。我没有

回答 1 投票 0

Spring kafka无限重试,无需retryabletopic注解

我们使用 spring Kafka 3.x 并使用 DefaultErrorHandler。 在此处理程序中,我们检查异常是否可重试,然后重试直到 Integer.maxValue,否则将消息放入 DLT 中。 整个想法是...

回答 1 投票 0

如何从具有多个分区(在我的例子中是三个分区)的 Kafka 主题中始终读取最新(最后)消息?

在我的 .net C# 项目(带有 Confluence Kafka 库)中,目前我正在使用以下代码从 Kafka 主题读取最新消息。但是通过这个代码我可以读取来自 def 的最新消息...

回答 1 投票 0

如何获取消费者最后一次提交偏移量的时间?

我正在尝试搜索消费者组上次提交偏移量的时间。我并不是想知道偏移量的位置,而是想知道消费者组上次提交偏移量的时间。 我已经...

回答 1 投票 0

如何使用Kafka Console Consumer消费两个时间戳之间的消息

是否可以在 Kafka 控制台消费者中检索特定时间戳范围的消息? 例如昨天 08:00 到 09:00 之间的 kafka 消息。

回答 3 投票 0

将Map从KafkaProducer发送到KafkaConsumer

我正在使用java 21和Spring Boot 3.2.0(快照)。我想使用从 KafkaProducer 到 KafkaConsumer 发送 Map>。 KafkaConsumerConfig 类: @配置 噗...

回答 1 投票 0

导入错误没有名为kafka的模块

我正在尝试从kafka导入KafkaConsumer。 它说: 没有名为 kafka 的模块 从 kafka 导入 KafkaConsumer 导入系统 消费者 = KafkaConsumer('测试', bootstrap_servers='10.221.129.223'...

回答 4 投票 0

集群迁移时,迁移后的kafka消费者重新分区后表现如何?

我对消费者的行为有一些概念上的怀疑。 集群迁移提供了很好的机会,可以对当前集群中过度分区或分区不足的主题进行重新分区...

回答 1 投票 0

如何在Python中处理特定数量的消息后优雅地停止Kafka消费者?

我有一个带有 BashOperator 的 Airflow DAG,它运行 Kafka 生产者,生成随机数量的消息。这些消息由 Kafka 消费者消费,并将它们写入 JSON 文件。然而...

回答 1 投票 0

kafka最早和最新的偏移值有什么区别

生产者发送消息1,2,3,4 消费者收到消息1,2,3,4 消费者崩溃/断开连接 生产者发送消息 5、6、7 消费者恢复并应该收到开始的消息...

回答 3 投票 0

Spring KafkaListener 断开连接并停止消费 - CommitFailedException

我有一个 Kafka 主题,它从不(故意)清除消息。我想不断地消费来自它的消息,即使我的消费者离线几天/几周/几个月。据我了解,这...

回答 1 投票 0

设置Kafka的保留期限

我正在尝试将kafka主题的保留期设置为1000毫秒,以删除其上不需要的消息。 我用过 bin/kafka-configs.sh --zookeeper my-zookeeper:2181 --alter --实体类型主题 --

回答 1 投票 0

Kafka Consumer 滞后的 Prometheus 警报规则

我想跟踪是否有任何应用程序停止从 kafka 主题消费。为此,我在警报管理器中添加了 Kafka Consumer lag 警报规则,每当条件满足时,该规则就会在 slack 通道上发送警报...

回答 1 投票 0

Spring Kafka:处理长时间作业时如何防止分区撤销

我想问一下,有没有什么办法可以防止consumer在使用@KafkaListener处理长时间操作时被撤销? 我有一个申请,里面有需要花很长时间的工作...

回答 1 投票 0

Spring Boot中Kafka Consumer的动态主题订阅

在我的库类中,我正在创建一个Kafka消费者,我的配置类如下: @服务 公共类消费者工厂{ 私有 TopicNameProvider topicNameProvider; 私人...

回答 1 投票 0

当消费者死亡时使用 Kafka 重试作业

Kafka可以监控消费者的心跳并通过持久化在队列中重试作业(最终将其发送给不同的消费者)吗? 我还知道 SQL 表可以用作此 p 的队列...

回答 1 投票 0

Python 中的 ConfluenceKafka:使用消费者到消费者记录集

我已经开始学习confluence kafka(python)。有 1 个生产者、1 个主题、1 个分区和 1 个消费者(简单设置)。我的要求是我希望集中获取数据。我读到了使用...

回答 1 投票 0

默认KafkaConsumerFactory - ConcurrentHashMap - NullPointerException

我正在使用kafka消费者。我从 Spring boot 1.5 升级到 2.6。现在,当我运行应用程序时,它无法开始抛出 NullPointerException 。请告诉我是否有人可以帮助...

回答 2 投票 0

Kafka 更新元数据失败

我正在使用 Kafka v0.10.1.1 和 Spring-boot。 我正在尝试使用以下生产者代码在 Kafka 主题移动用户中生成消息: 主题移动用户有5个分区和2个复制fa...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.