我对使用 PySpark 相当陌生,并且遇到以下问题。
假设我们有下表,实际上它有几千行:
+-----+------------+----------+-----------+
| ID| CODE | TYP| KIND|
+-----+------------+----------+-----------+
|10087| BH | L| D |
|10066| BS | B| null|
|10094| BL | L| E |
|10080| BF | B| null|
我想做的是迭代数据框。具体来说,我正在查看 TYP 和 KIND 列,并基于一个条件,例如如果我们有一行 TYP == L 且 KIND == D 我需要获取与该行对应的 ID 值,并在另一个数据帧中查找此 ID 值以便进一步加工。 我的问题是如何最好地解决这个问题,最好利用 PySpark 提供的并行化。
提前致谢!
这是您可以做到的一种方法。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark import SparkContext, SQLContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [
(10087, "BH", "L", "D"),
(10066, "BS", "B", "null"),
(10094, "BL", "L", "E"),
(10080, "BF", "B", "null")
]
df1Columns = ["ID","CODE","TYP","KIND"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
print("df1 dataframe")
df1.show(truncate=False)
filtered_df1 = df1.filter( (F.col("TYP") == "L") & (F.col("KIND") == "D")).select(F.col("ID"))
print("filtered_df1 dataframe")
filtered_df1.show(truncate=False)
然后您可以加入过滤后的数据框。
上述代码的输出:
df1 dataframe
+-----+----+---+----+
|ID |CODE|TYP|KIND|
+-----+----+---+----+
|10087|BH |L |D |
|10066|BS |B |null|
|10094|BL |L |E |
|10080|BF |B |null|
+-----+----+---+----+
filtered_df1 dataframe
+-----+
|ID |
+-----+
|10087|
+-----+