如何解决Spark中的AnalysisException:resolved属性

问题描述 投票:14回答:6
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")

连接操作工作正常,但是当我重用df2时,我面临未解决的属性错误

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")

错误:org.apache.spark.sql.AnalysisException:已解析的属性ID#426

java scala spark-dataframe
6个回答
21
投票

正如我在评论中提到的,它与https://issues.apache.org/jira/browse/SPARK-10925有关,更具体地说是https://issues.apache.org/jira/browse/SPARK-14948。重复使用引用会在命名时产生歧义,因此您必须克隆df - 请参阅https://issues.apache.org/jira/browse/SPARK-14948中的最后一条注释。


3
投票

如果您有df1和df2派生自df1,请尝试重命名df2中的所有列,以便在连接后没有两列具有相同的名称。所以在加入之前:

所以而不是df1.join(df2...

# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')

# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)

1
投票

尝试在两个连续的连接中使用一个DataFrame时,我遇到了同样的问题。

问题在于:DataFrame A有2列(我们称之为x和y),DataFrame B也有2列(让我们称之为w和z)。我需要在x = z上加入A和B,然后在y = z上将它们连接在一起。

(A join B on A.x=B.z) as C join B on C.y=B.z

我得到了确切的错误,在第二次加入时它抱怨“已解决的属性B.z#1234 ......”。

根据@Erik提供的链接以及其他一些博客和问题,我收集了我需要克隆的B.

这是我做的:

val aDF = ...
val bDF = ...
val bCloned = spark.createDataFrame(bDF.rdd, bDF.schema)
aDF.join(bDF, aDF("x") === bDF("z")).join(bCloned, aDF("y") === bCloned("z"))

0
投票

对于java开发人员,请尝试调用此方法:

private static Dataset<Row> cloneDataset(Dataset<Row> ds) {
    List<Column> filterColumns = new ArrayList<>();
    List<String> filterColumnsNames = new ArrayList<>();
    scala.collection.Iterator<StructField> it = ds.exprEnc().schema().toIterator();
    while (it.hasNext()) {
        String columnName = it.next().name();
        filterColumns.add(ds.col(columnName));
        filterColumnsNames.add(columnName);
    }
    ds = ds.select(JavaConversions.asScalaBuffer(filterColumns).seq()).toDF(scala.collection.JavaConverters.asScalaIteratorConverter(filterColumnsNames.iterator()).asScala().toSeq());
    return ds;
}

在加入之前的两个数据集上,它将数据集克隆为新数据集:

df1 = cloneDataset(df1); 
df2 = cloneDataset(df2);
Dataset<Row> join = df1.join(df2, col("column_name"));
// if it didn't work try this
final Dataset<Row> join = cloneDataset(df1.join(df2, columns_seq)); 

0
投票

如果您执行以下操作,它将起作用。

假设你有一个数据帧。 df1如果你想交叉加入相同的数据帧,你可以使用下面的

df1.toDF("ColA","ColB").as("f_df").join(df1.toDF("ColA","ColB").as("t_df"), 
   $"f_df.pcmdty_id" === 
   $"t_df.assctd_pcmdty_id").select($"f_df.pcmdty_id",$"f_df.assctd_pcmdty_id")

0
投票

根据我的经验,我们有2个解决方案1)克隆DF 2)在连接表之前重命名具有歧义的列。 (不要忘记删除重复的连接键)

我个人更喜欢第二种方法,因为在第一种方法中克隆DF需要时间,特别是如果数据大小很大。

© www.soinside.com 2019 - 2024. All rights reserved.