[尝试根据另一个数据框的列过滤pyspark数据框,例如我有一些tsv文件,例如...
test11.txt.gz
name id
a 1234
b 5678
c 7890
test12.txt.gz
name id
a 1234
f 1010
c 7890
并尝试根据其他test12数据框过滤test11数据框,以得到类似...的结果
name id
a 1234
c 7890
当前尝试使用类似...的代码
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import *
>>> sparkSession = SparkSession.builder.appName("data_debugging").getOrCreate()
# reading a parquet or tsv
>>> df1 = sparkSession.read.option("header", "true").option("sep", "true").csv("hdfs://hw001.co.local/tmp/test11.tsv.gz")
>>> df2 = sparkSession.read.option("header", "true").option("sep", "true").csv("hdfs://hw001.co.local/tmp/test12.tsv.gz")
# first trying like this
>>> df1[df1["name"].isin(df2["name"])]
...
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Resolved attribute(s) name#33 missing from name#10,id#11 in operator !Filter name#10 IN (name#33). Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;;\n!Filter name#10 IN (name#33)\n+- Relation[name#10,id#11] csv\n'
# then even with changing the column name for one of the dataframes like...
>>> df1[df1["name"].isin(df2.withColumnRenamed("name", "__FILTER")["__FILTER"])]
....
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Resolved attribute(s) __FILTER#52 missing from name#10,id#11 in operator !Filter name#10 IN (__FILTER#52).;;\n!Filter name#10 IN (__FILTER#52)\n+- Relation[name#10,id#11] csv\n'
因此,目前尚不确定该怎么做,也没有足够的pyspark经验来解释由此产生的错误消息。有人知道该怎么做吗?
要注意的一件事是,在我的实际用例中,表可能具有许多其他不同的列(它们在每个表中可能具有相同的名称但含义不同),因此不确定在此处进行连接是否正确,因为只需要[AFK]。[-]
[尝试根据另一个数据框的列过滤pyspark数据框,例如我有一些tsv文件,例如... test11.txt.gz名称id a 1234 b 5678 c 7890 test12.txt.gz名称...
正在执行