通过 Spark Streaming 高效读取 Kafka

问题描述 投票:0回答:1

我有一个应用程序,它从 Kafka 获取数据并将其保存到数据库中。我的代码如下所示:

spark.readStream
  .format("kafka")
  .options(options)
  .load()
  .writeStream
  .trigger(Trigger.ProcessingTime(20000))
  .foreachBatch({ (batch: DataFrame, _: Long) =>
    val rowsCount = batch.count
    saveBatch(batch)
    println(s"Saved $rowsCount rows")
  })
  .start()

在 Spark UI 中,我查看“Structure Streaming”选项卡,发现流的处理速率为每秒 100K 行。 如果我像这样删除行计数:

.foreachBatch({ (batch: DataFrame, _: Long) =>
    saveBatch(batch)
  })
  .start()

处理速率变为每秒 50K。 如您所见,在第一种情况下,我不使用缓存,当我计算批次中的行数并将批次保存到数据库时,我可能从 Kafka 读取数据两次(而不是第二种情况,当仅读取一次时) 我不敢相信,像行计数这样的琐碎操作会导致从 kafka 进行额外的读取,是否有任何方法可以在不缓存数据的情况下获取批长度?

apache-kafka spark-structured-streaming
1个回答
0
投票

@OneCricketeer 你是对的,Spark 不会多次从 Kafka 读取数据。我检查了 DAG 方案,两种情况下都只有一个“MicroBatchScan”阶段。另外,我比较了 Kafka 主机上的网络利用率,也没有任何差异(输出是相同的)。 看起来“结构流”选项卡中的“处理速率”意味着批处理期间已处理行的精确计数,但它不会影响从 Kafka 的读取

© www.soinside.com 2019 - 2024. All rights reserved.