我有一个 DFHelper 类,它可以帮助获取数据帧键。 我想将其维护为通用代码并从另一个主 scala 对象调用它。例如,我为泛型类定义的第一个代码部分,第二个代码部分是我的主要对象。在主对象内部我想调用 DFHelper 类
package integration.utils
import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
import org.apache.spark.sql.DataFrame
implicit class DFHelpers(df: DataFrame) {
def fields: Seq[String] =
this.fields(df.schema)
def fields(
schema: StructType = df.schema,
root: String = "",
sep: String = "."
): Seq[String] = {
schema.fields.flatMap { column =>
column match {
case _ if column.dataType.isInstanceOf[StructType] =>
fields(
column.dataType.asInstanceOf[StructType],
s"${root}${sep}`${column.name}`".stripPrefix(sep)
)
case _ if column.dataType.isInstanceOf[ArrayType] =>
column.dataType
.asInstanceOf[ArrayType]
.productIterator
.filter(_.isInstanceOf[StructType])
.map(_.asInstanceOf[StructType])
.flatMap(f => fields(f, s"${root}${sep}`${column.name}`".stripPrefix(sep)))
case _ => Seq(s"${root}${sep}`${column.name}`".stripPrefix(sep))
}
}.toList
}
};
主梯级
package integration.scipts
import org.apache.spark.sql.types.{ArrayType, StructType, TimestampType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct,explode_outer,arrays_zip,current_timestamp,expr, typedLit, array_union}
import integration.utils.DFHelpers
object mainDataProcessing {
/**
* This is the method that Cradle will call when your job starts.
*/
def execute(spark: SparkSession,
input: Iterable[Dataset[Row]]): Dataset[Row] = {
input_df=spark.read_json("path");
val inputFieldMap = typedLit(inputDF.fields.map(f => f -> f).toMap);
}
}
我想,“调用类”你指的是这段代码:
inputDF.fields
。
您可以使用
导入
DFHelpers
import integration.utils.DFHelpers
或者只是
import integration.utils._
然后
inputDF.fields
应该可以编译。
编译器应该自动将
inputDF.fields
脱糖为 new DFHelpers(inputDF).fields
。
https://docs.scala-lang.org/overviews/core/implicit-classes.html