Context:
我们有一个具有5个流的模型。
问题:
[Flow-2,Flow-3和Flow-4具有一个共同的连接字段,其余字段不同。
我们可以创建一个连接Flow-2,3,4数据的SQL,该数据存储在三个不同的表中,几乎没有计算/聚合。但是,Flow-2,3,4的输出字段数在每次运行中可能会有所不同。] >
方法:
SQL方式-标准化s3转储/目标表/报告SQL,即标准化每个flow(2,3,4)输出以及目标表中可能的列数,因此,如果没有任何字段,只需在s3转储期间将它们加载为NULL /空白,而将COPY加载为空白。标准化与s3模板对齐的目标表结构。还可以标准化报表SQL
SCALA / SPARK:当前正在探索此选项。为了执行PoC,创建了两个s3转储,在scala中创建了两个数据帧,尝试了数据帧联接以及spark SQL联接。我仍然不确定是否仍然可以动态选择新列,即使Spark代码通用。
通过创建直接指向s3的数据帧,我们可以解决COPY数据(动态字段)到目标表的问题。
] >>但是,报告的SQL问题仍然存在(或者至少我不知道,需要找到一种方法来处理它)
问题:
无论如何,我们可以在Scala / SparkSQL中处理问题(SQL中的动态列选择吗?
上下文:我们有一个包含5个流的模型。 Flow-1:来自不同源的输入数据,这些数据输入到Flow-2,Flow-2,Flow-3和Flow-4:是ML模型,每个模型都将少量字段倾倒到s3。Flow-...
{
1.
val sqlScript = "select col1, col2, .... from ... "
// string we can create dynamic
val df = spark.sql(sqlScript)
2. try use schema = = StructType(Seq(
StructField("id",LongType,true),
....
))
// and then use schema.fieldsName... or
val cols: List[Columns] = ...
// in df.select(cols:_*)
3. get schema (list fields with json file)
package spark
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{DataType, StructType}
import scala.io.Source
object DFFieldsWithJson extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
case class TestData (
id: Int,
firstName: String,
lastName: String,
descr: String
)
val dataTestDF = Seq(
TestData(1, "First Name 1", "Last Name 1", "Description 1"),
TestData(2, "First Name 2", "Last Name 2", "Description 2"),
TestData(3, "First Name 3", "Last Name 3", "Description 3")
).toDF()
dataTestDF.show(false)
// +---+------------+-----------+-------------+
// |id |firstName |lastName |descr |
// +---+------------+-----------+-------------+
// |1 |First Name 1|Last Name 1|Description 1|
// |2 |First Name 2|Last Name 2|Description 2|
// |3 |First Name 3|Last Name 3|Description 3|
// +---+------------+-----------+-------------+
val schemaJson =
"""{ "type" : "struct",
|"fields" : [
|{
| "name" : "id",
| "type" : "integer",
| "nullable" : true,
| "metadata" : { }
| },
| {
| "name" : "firstName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| },
| {
| "name" : "lastName",
| "type" : "string",
| "nullable" : true,
| "metadata" : {}
| }
| ]}""".stripMargin
val schemaSource = schemaJson.mkString
val schemaFromJson = DataType.fromJson(schemaSource).asInstanceOf[StructType]
println(schemaFromJson)
// StructType(StructField(id,IntegerType,true), StructField(firstName,StringType,true), StructField(lastName,StringType,true))
val cols: List[String] = schemaFromJson.fieldNames.toList
val col: List[Column] = cols.map(dataTestDF(_))
val df = dataTestDF.select(col: _*)
df.printSchema()
// root
// |-- id: integer (nullable = false)
// |-- firstName: string (nullable = true)
// |-- lastName: string (nullable = true)
df.show(false)
// +---+------------+-----------+
// |id |firstName |lastName |
// +---+------------+-----------+
// |1 |First Name 1|Last Name 1|
// |2 |First Name 2|Last Name 2|
// |3 |First Name 3|Last Name 3|
// +---+------------+-----------+
}
}