关于 Spark Streaming 的快速问题。
我正在从 KafkaUtils 初始化 createDirectStream 作为流,并将其保存为 Spark-streaming 中的 InputDStream,如下所示。
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
context,
PreferConsistent,
Subscribe[String, String](topicList, KafkaParameters)
)
stream
.foreachRDD { rdd =>
println("Executing within rdd")
val rddSize = rdd.count()
if (rddSize > 0) { println(s"Received data $rddSize") }
else {println("Not received any data")}
}
context.start()
我能够在近 50 分钟内看到生成的数据的输出。 50 分钟后,我可以在日志中看到以下消息,
Seeking to LATEST offset of partition topic_name-partition_number
Resetting offset for partition topic_name-partition_number to offset 12908.
但不再有日志显示“在 rdd 内执行”或“已接收数据 $rddSize”或“未收到任何数据”
当我启动消费者时,整个逻辑工作正常,但一段时间后它就停止工作了。知道这里发生了什么吗?
任何 Kafka 消费者都默认设置
auto.offset.reset=latest
,表示没有任何内容可消费的偏移量。正如您的日志所说,消费者正在重置并再次寻找主题的结尾。您需要一个积极运行的生产者才能查看具有该配置的任何数据
此外,Spark Streaming(这不是Kafka Steams)已弃用,Spark Structured Streaming 的所有功能都类似,至少对于 Kafka 而言是这样。