我想在 Scala 中使用指定的模式在
DataFrame
上创建。我尝试使用 JSON 读取(我的意思是读取空文件),但我认为这不是最佳实践。
假设您想要一个具有以下模式的数据框:
root
|-- k: string (nullable = true)
|-- v: integer (nullable = false)
您只需定义数据框的架构并使用空
RDD[Row]
:
import org.apache.spark.sql.types.{
StructType, StructField, StringType, IntegerType}
import org.apache.spark.sql.Row
val schema = StructType(
StructField("k", StringType, true) ::
StructField("v", IntegerType, false) :: Nil)
// Spark < 2.0
// sqlContext.createDataFrame(sc.emptyRDD[Row], schema)
spark.createDataFrame(sc.emptyRDD[Row], schema)
PySpark 等效项几乎相同:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("k", StringType(), True), StructField("v", IntegerType(), False)
])
# or df = sc.parallelize([]).toDF(schema)
# Spark < 2.0
# sqlContext.createDataFrame([], schema)
df = spark.createDataFrame([], schema)
使用带有
Product
类型的隐式编码器(仅限 Scala),例如 Tuple
:
import spark.implicits._
Seq.empty[(String, Int)].toDF("k", "v")
或案例类别:
case class KV(k: String, v: Int)
Seq.empty[KV].toDF
或
spark.emptyDataset[KV].toDF
从 Spark 2.0.0 开始,您可以执行以下操作。
让我们定义一个
Person
案例类:
scala> case class Person(id: Int, name: String)
defined class Person
导入
spark
SparkSession 隐式Encoders
:
scala> import spark.implicits._
import spark.implicits._
并使用 SparkSession 创建一个空的
Dataset[Person]
:
scala> spark.emptyDataset[Person]
res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
您还可以使用架构“DSL”(请参阅org.apache.spark.sql.ColumnName中的DataFrames的支持函数)。
scala> val id = $"id".int
id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true)
scala> val name = $"name".string
name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true)
scala> import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructType
scala> val mySchema = StructType(id :: name :: Nil)
mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true))
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema)
emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string]
scala> emptyDF.printSchema
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
创建空数据集的Java版本:
public Dataset<Row> emptyDataSet(){
SparkSession spark = SparkSession.builder().appName("Simple Application")
.config("spark.master", "local").getOrCreate();
Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema());
return emptyDataSet;
}
public StructType getSchema() {
String schemaString = "column1 column2 column3 column4 column5";
List<StructField> fields = new ArrayList<>();
StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true);
fields.add(indexField);
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
return schema;
}
import scala.reflect.runtime.{universe => ru}
def createEmptyDataFrame[T: ru.TypeTag] =
hiveContext.createDataFrame(sc.emptyRDD[Row],
ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType]
)
case class RawData(id: String, firstname: String, lastname: String, age: Int)
val sourceDF = createEmptyDataFrame[RawData]
在这里,您可以使用 scala 中的 StructType 创建模式并传递空 RDD,以便您能够创建空表。 以下代码是相同的。
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.BooleanType
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.StringType
//import org.apache.hadoop.hive.serde2.objectinspector.StructField
object EmptyTable extends App {
val conf = new SparkConf;
val sc = new SparkContext(conf)
//create sparksession object
val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
//Created schema for three columns
val schema = StructType(
StructField("Emp_ID", LongType, true) ::
StructField("Emp_Name", StringType, false) ::
StructField("Emp_Salary", LongType, false) :: Nil)
//Created Empty RDD
var dataRDD = sc.emptyRDD[Row]
//pass rdd and schema to create dataframe
val newDFSchema = sparkSession.createDataFrame(dataRDD, schema)
newDFSchema.createOrReplaceTempView("tempSchema")
sparkSession.sql("create table Finaltable AS select * from tempSchema")
}
这对于测试目的很有帮助。
Seq.empty[String].toDF()
这是在 pyspark 2.0.0 或更高版本中创建空数据框的解决方案。
from pyspark.sql import SQLContext
sc = spark.sparkContext
schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)])
sqlContext.createDataFrame(sc.emptyRDD(), schema)
我有一个特殊的要求,其中我已经有一个数据框,但在特定条件下我必须返回一个空数据框,所以我返回了
df.limit(0)
。
我想添加以下尚未提及的语法:
Seq[(String, Integer)]().toDF("k", "v")
它清楚地表明
()
部分是用于值的。它是空的,所以数据框是空的。
此语法也有利于手动添加
null
值。它只是有效,而其他选项要么不起作用,要么过于冗长。
在转换为 Spark 13.3/在 Databricks 中启用 Unity Catalog 后,我们遇到了
emptyRDD
方法的问题。以下解决方案可替代两者。
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.sql.Row
import java.util.ArrayList
val schema = new StructType()
.add("column1", StringType, true)
.add("column2", StringType, true)
val df = spark.createDataFrame(
new ArrayList[Row],
schema
)
df.count()
从 Spark 2.4.3 开始
val df = SparkSession.builder().getOrCreate().emptyDataFrame