Scala Dataframe / SQL:用于报告的动态列选择

问题描述 投票:-1回答:1

Context

我们有一个具有5个流的模型。

  • Flow-1:从不同来源输入数据,这些数据输入到Flow-2中
  • Flow-2,Flow-3和Flow-4:是ML模型,每个模型都将几个字段倾倒到s3中
  • Flow-5:具有来自Flow-2,Flow-3,Flow-4输出的数据的报告层
  • 整体数据量很小

问题

  • Flow-5是基于少量SQL的报告层,其输入数据来自Flow-2,Flow-3和Flow-4。
  • [Flow-2,Flow-3和Flow-4具有一个共同的连接字段,其余字段不同。

  • 我们可以创建一个连接Flow-2,3,4数据的SQL,该数据存储在三个不同的表中,几乎没有计算/聚合。但是,Flow-2,3,4的输出字段数在每次运行中可能会有所不同。] >

    • 问题1:每次s3文件(Flow-2 / 3/4)结构更改时,由于目标表架构与s3文件结构不同(要修复,需要手动添加/删除,因此在COPY期间通常会导致问题)目标表中的字段,以与s3数据对齐)
    • 问题2:对于s3文件中的任何添加/删除,都需要通过添加/删除列来对报告进行更改

方法:

  • SQL方式-标准化s3转储/目标表/报告SQL,即标准化每个flow(2,3,4)输出以及目标表中可能的列数,因此,如果没有任何字段,只需在s3转储期间将它们加载为NULL /空白,而将COPY加载为空白。标准化与s3模板对齐的目标表结构。还可以标准化报表SQL

  • SCALA / SPARK:当前正在探索此选项。为了执行PoC,创建了两个s3转储,在scala中创建了两个数据帧,尝试了数据帧联接以及spark SQL联接。我仍然不确定是否仍然可以动态选择新列,即使Spark代码通用。

    • 通过创建直接指向s3的数据帧,我们可以解决COPY数据(动态字段)到目标表的问题。

      ] >>
    • 但是,报告的SQL问题仍然存在(或者至少我不知道,需要找到一种方法来处理它)

    问题:

无论如何,我们可以在Scala / SparkSQL中处理问题(SQL中的动态列选择吗?

上下文:我们有一个包含5个流的模型。 Flow-1:来自不同源的输入数据,这些数据输入到Flow-2,Flow-2,Flow-3和Flow-4:是ML模型,每个模型都将少量字段倾倒到s3。Flow-...

scala apache-spark pyspark apache-spark-sql pyspark-sql
1个回答
0
投票
{ 1. val sqlScript = "select col1, col2, .... from ... " // string we can create dynamic val df = spark.sql(sqlScript) 2. try use schema = = StructType(Seq( StructField("id",LongType,true), .... )) // and then use schema.fieldsName... or val cols: List[Columns] = ... // in df.select(cols:_*) 3. get schema (list fields with json file) package spark import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession} import org.apache.spark.sql.types.{DataType, StructType} import scala.io.Source object DFFieldsWithJson extends App { val spark = SparkSession.builder() .master("local") .appName("DataFrame-example") .getOrCreate() import spark.implicits._ case class TestData ( id: Int, firstName: String, lastName: String, descr: String ) val dataTestDF = Seq( TestData(1, "First Name 1", "Last Name 1", "Description 1"), TestData(2, "First Name 2", "Last Name 2", "Description 2"), TestData(3, "First Name 3", "Last Name 3", "Description 3") ).toDF() dataTestDF.show(false) // +---+------------+-----------+-------------+ // |id |firstName |lastName |descr | // +---+------------+-----------+-------------+ // |1 |First Name 1|Last Name 1|Description 1| // |2 |First Name 2|Last Name 2|Description 2| // |3 |First Name 3|Last Name 3|Description 3| // +---+------------+-----------+-------------+ val schemaJson = """{ "type" : "struct", |"fields" : [ |{ | "name" : "id", | "type" : "integer", | "nullable" : true, | "metadata" : { } | }, | { | "name" : "firstName", | "type" : "string", | "nullable" : true, | "metadata" : {} | }, | { | "name" : "lastName", | "type" : "string", | "nullable" : true, | "metadata" : {} | } | ]}""".stripMargin val schemaSource = schemaJson.mkString val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType] println(schemaFromJson) // StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true)) val cols: List[String] = schemaFromJson.fieldNames.toList val col: List[Column] = cols.map(dataTestDF(_)) val df = dataTestDF.select(col: _*) df.printSchema() // root // |-- id: integer (nullable = false) // |-- firstName: string (nullable = true) // |-- lastName: string (nullable = true) df.show(false) // +---+------------+-----------+ // |id |firstName |lastName | // +---+------------+-----------+ // |1 |First Name 1|Last Name 1| // |2 |First Name 2|Last Name 2| // |3 |First Name 3|Last Name 3| // +---+------------+-----------+ } }
© www.soinside.com 2019 - 2024. All rights reserved.