inferSchema=true 不适用于 Spark 结构化流读取 csv 文件

问题描述 投票:0回答:3

我收到错误消息

java.lang.IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.

    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:251)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:115)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:115)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:232)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:242)
    at org.apache.spark.sql.streaming.DataStreamReader.csv(DataStreamReader.scala:404)
    at io.sekai.core.streaming.KafkaDataGenerator.readFromCSVFile(KafkaDataGenerator.scala:38)

当我加载 csv 文件时

spark2
  .readStream
  .format("csv")
  .option("inferSchema", "true")
  .option("header", "true")
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)

我尝试了另一种格式的选项,例如

spark2
  .readStream
  .format("csv")
  .option("inferSchema", value = true)
  .option("header", value = true)
  //.schema(schema)
  .option("delimiter", ",")
  .option("maxFilesPerTrigger", 1)
  .csv(path)

我想推断架构并注释掉显式架构用法。

csv 文件示例如下:

id,Energy Data,Distance,Humidity,Ambient Temperature,Cold Water Temperature,Vibration Value 1,Vibration Value 2,Handle Movement
1,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
2,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
3,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
4,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
5,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
6,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
7,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
8,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
9,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2
10,0.00 246.47 0.00,4in, 12cm,55.50%,25°C,25°C,0,0,6.08 7.53 0.31m/s^2

这里出了什么问题,因为我严格遵循选项说明,但是推断是如何发生的?

scala apache-spark spark-structured-streaming spark-csv
3个回答
5
投票

您有 2 个选择:

  1. 在运行流式查询之前,将数据样本写入您的目标。当您再次运行流查询时,将推断架构。
  2. spark.sql.streaming.schemaInference
    设置为
    true
    :
spark.sql("set spark.sql.streaming.schemaInference=true")

来自文档

默认情况下,来自基于文件的源的结构化流需要您指定架构,而不是依赖 Spark 自动推断它。此限制确保即使在失败的情况下,流式查询也将使用一致的模式。对于临时用例,您可以通过将 Spark.sql.streaming.schemaInference 设置为 true 来重新启用模式推断。


0
投票

创建流源DataFrame时我们必须指定schema。

来自文档

默认情况下,来自基于文件的源的结构化流需要您 指定模式,而不是依赖 Spark 来推断它 自动地。此限制确保一致的模式 用于流式查询,即使在失败的情况下也是如此。


0
投票

解决方案在错误消息中:“....如果目录中已存在某些文件,则根据文件格式,您可以使用 'spark.read.load(directory) 在该目录上创建静态 DataFrame )' 并从中推断模式。"

首先创建架构:

file_schema = spark.read
                   .format("csv")
                   .option("inferSchema", True)
                   .option("header", True)
                   .load(directory)
                   .limit(10)
                   .schema

然后阅读流:

spark.readStream
     .format("csv")
     .schema(file_schema)
     .load(directory)
© www.soinside.com 2019 - 2024. All rights reserved.