我是 Kafka 新手,期待了解“寻求偏移量”和“重置偏移量”的工作原理。
例如,制作者开始制作从1,2,3,4,5,..,99,100开始的记录。我有一个有 5 个分区的消费者正在使用这些数据。我正在使用 createDirectStream 在消费者处开始流式消费。
我的问题是,Seeking 是否会获取每个分区的偏移量,并在消费数据后 Reset 是否会重置消费者中的偏移量,以说明它应该在下一次迭代中消耗哪个偏移量?
如果以上理解不正确,这里的查找或重置是如何工作的?
我正在使用的版本,2.4 Spark、0.10 kafka 和 0.12 scala。
节目:
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
context,
PreferConsistent,
Subscribe[String, String](topicList, KafkaParameters)
)
stream
.foreachRDD { rdd =>
val rddSize = rdd.count()
if (rddSize > 0) { println(s"Received data $rddSize") }
else {println("Not received any data")}
}
context.start()
重置需要寻找,所以没有真正的区别。寻找并不会“得到”任何偏移量;您需要提供一个。
您可以使用
kafka-consumer-groups --reset-offsets
等工具对不活跃的消费者组进行重置,因此不会主动运行。在运行时,任何活跃的消费者都可以seek
并再次调用 poll
您需要一个消费者组来“重置”(即告诉 Kafka 消费者下一次执行时偏移量应该从哪里开始,通过查找和提交)
既然你提到了 Spark,但是它不使用消费者组,你应该使用检查点(和结构化流,而不是
createDirectStream
)和 Spark 3+,如果可能的话,因为 kafka 库有很多改进