如何在pyspark中过滤一个数据帧(例如,“从X中选择*,其中X.colx在(从Y中选择余数)”)?

问题描述 投票:1回答:1

[尝试根据另一个数据框的列过滤pyspark数据框,例如我有一些tsv文件,例如...

test11.txt.gz

name    id
a       1234
b       5678
c       7890


test12.txt.gz

name    id
a       1234
f       1010
c       7890

并尝试根据其他test12数据框过滤test11数据框,以得到类似...的结果

name    id
a       1234
c       7890

当前尝试使用类似...的代码

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import *

>>> sparkSession = SparkSession.builder.appName("data_debugging").getOrCreate()

# reading a parquet or tsv
>>> df1 = sparkSession.read.option("header", "true").option("sep", "true").csv("hdfs://hw001.co.local/tmp/test11.tsv.gz")
>>> df2 = sparkSession.read.option("header", "true").option("sep", "true").csv("hdfs://hw001.co.local/tmp/test12.tsv.gz")


# first trying like this
>>> df1[df1["name"].isin(df2["name"])]
...
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Resolved attribute(s) name#33 missing from name#10,id#11 in operator !Filter name#10 IN (name#33). Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;;\n!Filter name#10 IN (name#33)\n+- Relation[name#10,id#11] csv\n'


# then even with changing the column name for one of the dataframes like...
>>> df1[df1["name"].isin(df2.withColumnRenamed("name", "__FILTER")["__FILTER"])]
....
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Resolved attribute(s) __FILTER#52 missing from name#10,id#11 in operator !Filter name#10 IN (__FILTER#52).;;\n!Filter name#10 IN (__FILTER#52)\n+- Relation[name#10,id#11] csv\n'

因此,目前尚不确定该怎么做,也没有足够的pyspark经验来解释由此产生的错误消息。有人知道该怎么做吗?


要注意的一件事是,在我的实际用例中,表可能具有许多其他不同的列(它们在每个表中可能具有相同的名称但含义不同),因此不确定在此处进行连接是否正确,因为只需要[AFK]。[-]

[尝试根据另一个数据框的列过滤pyspark数据框,例如我有一些tsv文件,例如... test11.txt.gz名称id a 1234 b 5678 c 7890 test12.txt.gz名称...

pyspark pyspark-sql
1个回答
0
投票

正在执行

© www.soinside.com 2019 - 2024. All rights reserved.