有一个 Spark 流应用程序,它从 Kafka 读取消息,处理它们,然后将它们存储到某个数据库。
在延迟优化期间,检测到从 Kafka 读取时持续延迟约 2 秒。
延迟的测量方式为“读取消息的时间”-“Kafka 代理分配的时间戳”(因为 Kafka 和 Spark 节点之间没有时间偏移)
没有有意设置的 Spark/Kafka 连接器配置限制单个批次的最小消息数量。延迟是恒定的,即在 100 msg/s 和 10.000 msg/s 下保持相同。
有人知道为什么会发生延迟以及如何消除它吗?
尝试运行应用程序几个小时以加热 JVM,尽管它没有改变任何东西; 尝试读取不同分区号的主题:20 和 30 - 也没有变化
完整的消息处理如下所示:
val additionalOptions = Map(
"startingOffsets" -> "latest",
"kafka.security.protocol" -> "SASL_SSL",
"kafka.sasl.mechanism" -> "SCRAM-SHA-512"
)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe", ...)
.option("kafka.allow.auto.create.topics", "false")
.options(additionalOptions)
.load()
// df processing
df.writeStream
.outputMode("append")
.option("triggerType", "processingTime")
.option("triggerInterval", 0)
.option("checkpointLocation", ...)
.option("outputMode", "append")
.foreachBatch { (dataset: Dataset[W], batchId: Long) => writeBatch(dataset, batchId) }
.start()
根据文档
here,在可选配置下有一个设置kafkaConsumer.pollTimeoutMs
,默认为 2 分钟。因此理论上任何发布到 kafka 的消息都会以最多 2 分钟的延迟被消耗。更新此内容以减少延迟。
当您的消费者从流中读取数据时,根据生产者吞吐量和消费者的处理逻辑 - 您不想消费太早或消费太晚。
例如如果您的生产者每分钟生成 1 条消息,则超时 1 秒的轮询没有意义,因为 60 次轮询中只有 1 次有用。但是,如果您的生产者吞吐量约为 10 条消息/秒,您可能希望减少轮询超时。然后你的消费者处理就出现了——诸如消费者理想的批量大小应该是多少等问题。根据情况,您可以配置轮询超时。