用于与Apache Kafka使用者API相关的问题
我正在使用sarama-cluster(由Golang kafka消费者客户端编写)在代理中,我的主题的分区偏移量为11000,我的消费者组的分区偏移量为10100.然后我运行我的集群消费者,...
假设我有一个使用来自kafka集群的日志的应用程序。我希望应用程序定期检查群集的可用性,并根据它执行某些操作。一世 ...
当在代理配置中将log.segment.bytes设置为512兆字节时,我有一个kafka主题不断滚动新的日志段。大多数日志文件平均为5-10千字节。如果我看......
如何将Confluent Control Center Interceptor添加到现有S3(Sink)连接器?监视接收器。我正在寻找文件。任何帮助表示赞赏。
我遇到了问题,我们正在使用Kafka和spark。我们正在使用forEachRDD:messages.foreachRDD {rdd => val newRDD = rdd.map {message => ...
可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?
这是来自KafkaMessageListenerContainer的代码,在什么情况下它们是InterruptedException的可能性,可以被这个代码抛出,突然在我的应用程序日志中,这正在消耗......
我知道使用Kafka使用者的api我们可以获得与特定时间戳(getOffsetsByTimes())相对应的偏移量。我们如何获得偏移量并从一个点开始重放流...
如何配置使用者,以便在单次轮询中一次轮询50条记录。我已经创建了max_poll_records设置为50的消费者。但是我的消费者正在使用单个记录...
为了实现Kafka消费者对消息的一次性处理,我一次只提交一条消息,如下面的public void commitOneRecordConsumer(long seconds){KafkaConsumer
在我的应用程序中,我将使用来自100多个主题的数据。我应该为每个主题创建一个消费者还是从所有主题中创建一个消费者消费者?将创造100个消费者创造100 ...
例如,如果我在我的一个KafkaConsumer中有一个长时间运行的进程。 (假设需要1小时才能完成。)如果触发了重新平衡,那么这个消费者的撤销操作会等到这个消费者......
我正在使用Kafka Consumer API来构建使用者。消息结构很复杂。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我...
在Consumer API中使用createDrainingControl?
我正在浏览Alpakka中Kafka的Consumer API文档。我遇到了这段代码。根据我的理解,使用msg.committableOffset()提交偏移量。那为什么......