用于与Apache Kafka使用者API相关的问题
可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?
这是来自KafkaMessageListenerContainer的代码,在什么情况下它们是InterruptedException的可能性,可以被这个代码抛出,突然在我的应用程序日志中,这正在消耗......
我知道使用Kafka使用者的api我们可以获得与特定时间戳(getOffsetsByTimes())相对应的偏移量。我们如何获得偏移量并从一个点开始重放流...
如何配置使用者,以便在单次轮询中一次轮询50条记录。我已经创建了max_poll_records设置为50的消费者。但是我的消费者正在使用单个记录...
为了实现Kafka消费者对消息的一次性处理,我一次只提交一条消息,如下面的public void commitOneRecordConsumer(long seconds){KafkaConsumer
在我的应用程序中,我将使用来自100多个主题的数据。我应该为每个主题创建一个消费者还是从所有主题中创建一个消费者消费者?将创造100个消费者创造100 ...
例如,如果我在我的一个KafkaConsumer中有一个长时间运行的进程。 (假设需要1小时才能完成。)如果触发了重新平衡,那么这个消费者的撤销操作会等到这个消费者......
我正在使用Kafka Consumer API来构建使用者。消息结构很复杂。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我...
在Consumer API中使用createDrainingControl?
我正在浏览Alpakka中Kafka的Consumer API文档。我遇到了这段代码。根据我的理解,使用msg.committableOffset()提交偏移量。那为什么......