Pyspark 数据帧中的奇怪行为

问题描述 投票:0回答:0

我有以下包含两个字段 ID 和 QUARTER 的 pyspark 数据框:

pandas_df = pd.DataFrame({"ID":[1, 2, 3,4, 5, 3,5,6,3,7,2,6,8,9,1,7,5,1,10],"QUARTER":[1, 1, 1, 1, 1,2,2,2,3,3,3,3,3,4,4,5,5,5,5]})
spark_df = spark.createDataFrame(pandas_df)
spark_df.createOrReplaceTempView('spark_df')

我有以下列表,其中包含我想要的第 5 个季度中每个季度的条目数

numbers=[2,1,3,1,2]

我想每次从每个季度中选择的行数等于列表“数字”中指示的数字。我应该尊重

ID
最后应该是独一无二的。这意味着如果我在某个季度选择了一个ID,我不应该在另一个季度再次选择它。

为此,我使用了以下 pyspark 代码:


quart=1 # the first quarter
liste_unique=[] # an empty list that will contains the unique Id values to compare with
for i in range(0,len(numbers)):
  tmp=spark_df.where(spark_df.QUARTER==quart)# select only rows with the chosed quarter
  tmp=tmp.where(tmp.ID.isin(liste_unique)==False)# the selected id were not selected before
  w = Window().partitionBy(lit('col_count0')).orderBy(lit('col_count0'))#dummy column
  df_final=tmp.withColumn("row_num", row_number().over(w)).filter(col("row_num").between(1,numbers[i])) # number of rows needed from the 'numbers list'
  df_final=df_final.drop(col("row_num")) # drop the row num column
  liste_tempo=df_final.select(['ID']).rdd.map(lambda x : x[0]).collect() # transform the selected  id into list 

 liste_unique.extend(liste_tempo) # extend the list of unique id each time we select new rows from a quarter
  
  df0=df0.union(df_final) # union the empty list each time with the selected data in each quarter
  
  quart=quart+1 #increment the quarter

df0 只是一个开头的空列表。它将包含最后的所有数据,可以声明如下

spark = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()
 
# Create an empty schema

columns = StructType([StructField('ID',
                                  StringType(), True),
                    StructField('QUARTER',
                                StringType(), True)
                      ])

df0 = spark.createDataFrame(data = [],
                           schema = columns)

代码工作正常,没有错误,除了我可以在不同的季度找到重复的 ID,这是不正确的。此外,一个奇怪的行为是当我试图计算 df0 数据帧中唯一 ID 的数量时(在一个新的不同单元格中)

print(df0.select('ID').distinct().count())

即使数据帧未与任何其他进程接触,它也会在每次执行时给出不同的值(与示例相比,更大的数据集更清晰)。我无法理解这种行为,我尝试使用

unpersist(True)
删除缓存或临时变量,但没有任何改变。我怀疑
Union
功能被错误使用,但我没有在 pyspark 中找到任何替代方法。

python apache-spark pyspark union pyspark-schema
© www.soinside.com 2019 - 2024. All rights reserved.