从 Kafka Spark 读取时出现长时间延迟

问题描述 投票:0回答:1

有一个 Spark 流应用程序,它从 Kafka 读取消息,处理它们,然后将它们存储到某个数据库。 在延迟优化期间,检测到从 Kafka 读取时持续延迟约 2 秒。
延迟的测量方式为“读取消息的时间”-“Kafka 代理分配的时间戳”(因为 Kafka 和 Spark 节点之间没有时间偏移) 没有有意设置的 Spark/Kafka 连接器配置限制单个批次的最小消息数量。延迟是恒定的,即在 100 msg/s 和 10.000 msg/s 下保持相同。
有人知道为什么会发生延迟以及如何消除它吗?

尝试运行应用程序几个小时以加热 JVM,尽管它没有改变任何东西; 尝试读取不同分区号的主题:20 和 30 - 也没有变化

完整的消息处理如下所示:

val additionalOptions = Map( "startingOffsets" -> "latest", "kafka.security.protocol" -> "SASL_SSL", "kafka.sasl.mechanism" -> "SCRAM-SHA-512" ) val df = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", ...) .option("subscribe", ...) .option("kafka.allow.auto.create.topics", "false") .options(additionalOptions) .load() // df processing df.writeStream .outputMode("append") .option("triggerType", "processingTime") .option("triggerInterval", 0) .option("checkpointLocation", ...) .option("outputMode", "append") .foreachBatch { (dataset: Dataset[W], batchId: Long) => writeBatch(dataset, batchId) } .start()


apache-spark apache-kafka latency low-latency
1个回答
0
投票
简短回答

根据文档

here

,在可选配置下有一个设置kafkaConsumer.pollTimeoutMs,默认为 2 分钟。因此理论上任何发布到 kafka 的消息都会以最多 2 分钟的延迟被消耗。更新此内容以减少延迟。

理论

当您的消费者从流中读取数据时,根据生产者吞吐量和消费者的处理逻辑 - 您不想消费太早或消费太晚。

例如如果您的生产者每分钟生成 1 条消息,则超时 1 秒的轮询没有意义,因为 60 次轮询中只有 1 次有用。但是,如果您的生产者吞吐量约为 10 条消息/秒,您可能希望减少轮询超时。然后你的消费者处理就出现了——诸如消费者理想的批量大小应该是多少等问题。根据情况,您可以配置轮询超时。

© www.soinside.com 2019 - 2024. All rights reserved.