我在 Spark 中有一个表
employee_1
,具有属性 id 和 name(带数据),另一个表 employee_2
具有相同的属性。我想通过将 id 值增加 +1 来加载数据。
WITH EXP AS (SELECT ALIASNAME.ID+1 ID, ALIASNAME.NAME NAME FROM employee_1 ALIASNAME)
INSERT INTO TABLE employee_2 SELECT * FROM EXP;
我在 HDFS 位置有一个文件(包含数据)。
当我从后端运行测试程序时,它成功了。但数据未加载。
employee_2
是空的。
注:
如果您在 Hive 中运行上述 WITH 子句,它将成功并且数据将加载。但在 Spark 1.6 中就不会了。为什么?
WITH 语句不是问题,而是 INSERT INTO 语句造成了问题。
这是一个使用 .insertInto() 样式而不是“INSERT INTO”SQL 的工作示例:
val s = Seq((1,"foo"), (2, "bar"))
s: Seq[(Int, String)] = List((1,foo), (2,bar))
val df = s.toDF("id", "name")
df.registerTempTable("df")
sql("CREATE TABLE edf_final (id int, name string)")
val e = sql("WITH edf AS (SELECT id+1, name FROM df cook) SELECT * FROM edf")
e.insertInto("edf_final")
另一种选择是使用
df.write.mode("append").saveAsTable("edf_final")
样式。
由于答案是针对 Spark 2.x, 我正在用 Spark 3 方式重写它。
%scala
import org.apache.spark.sql.functions.col
val s = Seq((1,"foo"), (2, "bar"))
val df = s.toDF("id", "name")
df.createOrReplaceTempView("df")
spark.sql("CREATE TABLE if not exists edf_final (id int, name string)")
val e = spark.sql("WITH edf AS (SELECT id+1 AS id, name FROM df) SELECT * FROM edf")
e.select(col("id"), col("name")).write.insertInto("edf_final")
spark.sql("select * from edf_final").show
结果:
df:org.apache.spark.sql.DataFrame
id:integer
name:string
e:org.apache.spark.sql.DataFrame
id:integer
name:string
+---+----+
| id|name|
+---+----+
| 2| foo|
| 3| bar|
+---+----+