迭代 PySpark 数据帧并根据条件获取值

问题描述 投票:0回答:1

我对使用 PySpark 相当陌生,并且遇到以下问题。

假设我们有下表,实际上它有几千行:

+-----+------------+----------+-----------+
|   ID|       CODE |       TYP|       KIND|
+-----+------------+----------+-----------+
|10087|       BH   |         L|       D   |
|10066|       BS   |         B|       null|
|10094|       BL   |         L|       E   |
|10080|       BF   |         B|       null|

我想做的是迭代数据框。具体来说,我正在查看 TYP 和 KIND 列,并基于一个条件,例如如果我们有一行 TYP == L 且 KIND == D 我需要获取与该行对应的 ID 值,并在另一个数据帧中查找此 ID 值以便进一步加工。 我的问题是如何最好地解决这个问题,最好利用 PySpark 提供的并行化。

提前致谢!

python dataframe pyspark iteration lookup
1个回答
0
投票

这是您可以做到的一种方法。

from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql import Window

from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)

data1 = [
    (10087, "BH", "L", "D"),
    (10066, "BS", "B", "null"),
    (10094, "BL", "L", "E"),
    (10080, "BF", "B", "null")
     ]

df1Columns = ["ID","CODE","TYP","KIND"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

print("df1 dataframe")
df1.show(truncate=False)

filtered_df1 = df1.filter( (F.col("TYP") == "L") & (F.col("KIND") == "D")).select(F.col("ID"))

print("filtered_df1 dataframe")
filtered_df1.show(truncate=False)

然后您可以加入过滤后的数据框。

上述代码的输出:

df1 dataframe
+-----+----+---+----+
|ID   |CODE|TYP|KIND|
+-----+----+---+----+
|10087|BH  |L  |D   |
|10066|BS  |B  |null|
|10094|BL  |L  |E   |
|10080|BF  |B  |null|
+-----+----+---+----+

filtered_df1 dataframe
+-----+
|ID   |
+-----+
|10087|
+-----+
© www.soinside.com 2019 - 2024. All rights reserved.