Spark Streaming 不执行 foreach 中的代码行

问题描述 投票:0回答:1

关于 Spark Streaming 的快速问题。

我正在从 KafkaUtils 初始化 createDirectStream 作为流,并将其保存为 Spark-streaming 中的 InputDStream,如下所示。

val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      context,
      PreferConsistent,
      Subscribe[String, String](topicList, KafkaParameters)
    )

  stream
      .foreachRDD { rdd =>
        println("Executing within rdd")
        val rddSize = rdd.count()
        if (rddSize > 0) { println(s"Received data $rddSize") }
        else {println("Not received any data")}
      }

  context.start()

我能够在近 50 分钟内看到生成的数据的输出。 50 分钟后,我可以在日志中看到以下消息,

Seeking to LATEST offset of partition topic_name-partition_number
Resetting offset for partition topic_name-partition_number to offset 12908.

但不再有日志显示“在 rdd 内执行”或“已接收数据 $rddSize”或“未收到任何数据”

当我启动消费者时,整个逻辑工作正常,但一段时间后它就停止工作了。知道这里发生了什么吗?

apache-spark apache-kafka spark-streaming
1个回答
0
投票

任何 Kafka 消费者都默认设置

auto.offset.reset=latest
,表示没有任何内容可消费的偏移量。正如您的日志所说,消费者正在重置并再次寻找主题的结尾。您需要一个积极运行的生产者才能查看具有该配置的任何数据

此外,Spark Streaming(这不是Kafka Steams)已弃用,Spark Structured Streaming 的所有功能都类似,至少对于 Kafka 而言是这样。

© www.soinside.com 2019 - 2024. All rights reserved.