我有以下包含两个字段 ID 和 QUARTER 的 pyspark 数据框:
pandas_df = pd.DataFrame({"ID":[1, 2, 3,4, 5, 3,5,6,3,7,2,6,8,9,1,7,5,1,10],"QUARTER":[1, 1, 1, 1, 1,2,2,2,3,3,3,3,3,4,4,5,5,5,5]})
spark_df = spark.createDataFrame(pandas_df)
spark_df.createOrReplaceTempView('spark_df')
我有以下列表,其中包含我想要的第 5 个季度中每个季度的条目数
numbers=[2,1,3,1,2]
我想每次从每个季度中选择的行数等于列表“数字”中指示的数字。我应该尊重
ID
最后应该是独一无二的。这意味着如果我在某个季度选择了一个ID,我不应该在另一个季度再次选择它。
为此,我使用了以下 pyspark 代码:
quart=1 # the first quarter
liste_unique=[] # an empty list that will contains the unique Id values to compare with
for i in range(0,len(numbers)):
tmp=spark_df.where(spark_df.QUARTER==quart)# select only rows with the chosed quarter
tmp=tmp.where(tmp.ID.isin(liste_unique)==False)# the selected id were not selected before
w = Window().partitionBy(lit('col_count0')).orderBy(lit('col_count0'))#dummy column
df_final=tmp.withColumn("row_num", row_number().over(w)).filter(col("row_num").between(1,numbers[i])) # number of rows needed from the 'numbers list'
df_final=df_final.drop(col("row_num")) # drop the row num column
liste_tempo=df_final.select(['ID']).rdd.map(lambda x : x[0]).collect() # transform the selected id into list
liste_unique.extend(liste_tempo) # extend the list of unique id each time we select new rows from a quarter
df0=df0.union(df_final) # union the empty list each time with the selected data in each quarter
quart=quart+1 #increment the quarter
df0 只是一个开头的空列表。它将包含最后的所有数据,可以声明如下
spark = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()
# Create an empty schema
columns = StructType([StructField('ID',
StringType(), True),
StructField('QUARTER',
StringType(), True)
])
df0 = spark.createDataFrame(data = [],
schema = columns)
代码工作正常,没有错误,除了我可以在不同的季度找到重复的 ID,这是不正确的。此外,一个奇怪的行为是当我试图计算 df0 数据帧中唯一 ID 的数量时(在一个新的不同单元格中)
print(df0.select('ID').distinct().count())
即使数据帧未与任何其他进程接触,它也会在每次执行时给出不同的值(与示例相比,更大的数据集更清晰)。我无法理解这种行为,我尝试使用
unpersist(True)
删除缓存或临时变量,但没有任何改变。我怀疑 Union
功能被错误使用,但我没有在 pyspark 中找到任何替代方法。