Spark MicroBatchExecution:流查询取得了进展……真的吗?

问题描述 投票:0回答:1

我正在运行增量流查询,并且在“什么都没有发生”的情况下,我不断收到StreamingQueryListener拦截的数十亿次更新和QueryProgressEvent-似乎如此。如果未检测到要处理的行,为什么会触发这些事件?什么被视为“进步”?

对我来说,这只是原木污染,这是不必要的,我不得不找到一种方法将其静音,直到发生“真正的”事情,但是我仍然对为什么和如何感到好奇。

20/01/01 23:18:21 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "bca1d3d2-4196-4e89-9dcf-916536bd00a6",
  "runId" : "2e6bfbef-cea1-48dd-b228-39f7fdc09e27",
  "name" : "STREAM_DELTA",
  "timestamp" : "2020-01-01T23:18:21.950Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 1,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/delta/source]",
    "startOffset" : {
      "logOffset" : 0
    },
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "DeltaSink[/delta/output]"
  }
}
apache-spark spark-structured-streaming
1个回答
0
投票

如果未检测到任何行,为什么会触发那些事件?

结构化流不是事件驱动的。结构化流可以连续运行,也可以通过微量批量运行。

  • 连续:您的视频流不间断运行。一轮运行结束后,下一轮开始。
  • Microbatching:您的流根据您的触发规则以一定间隔运行(例如5秒)。当一个流运行结束时,它将等待5秒钟,直到重新运行。

不管哪种情况,流总是检查输入位置是否有新文件要处理。如果有新文件,它将按配置处理它们,并将文件名写入其检查点,以使这些文件不会被重新处理为新文件。如果没有新文件,它将结束运行,因为它认为没有工作要做。这就是为什么即使没有检测到行也触发这些事件的原因。

什么被认为是“进步”?

进度被视为成功运行的结论,如您发布的日志所示。流通过运行而“进步”。

© www.soinside.com 2019 - 2024. All rights reserved.