运行 show() 两次对于 Dataframe 的 rand() 函数给出相同的结果

问题描述 投票:0回答:2

下面生成的随机数按预期每行都不同。如此精细。但我显然在思考中缺少一些基本方面。

from pyspark.sql import functions as F
df = spark.range(10).withColumn("randomNum",F.rand())
df.show(truncate=False)

返回:

+---+-------------------+
|id |randomNum          |
+---+-------------------+
|0  |0.8128581612050234 |
|1  |0.40656852491856355|
|2  |0.9444869347865689 |
|3  |0.10391423680687417|
|4  |0.05285485891027453|
|5  |0.5140906081158558 |
|6  |0.900727341820192  |
|7  |0.11046600268909801|
|8  |0.6509183512961298 |
|9  |0.5060097759646045 |
+---+-------------------+

然后再次调用

show()
- 下面的特殊操作,为什么我们再次得到与第一轮相同的随机数序列?
show()
是否在某些方面重写了传统的 Action 方法,因为它认为它是相同的 DF?如果是这样,则不适用于 pyspark 等中的所有方法。我在 Databricks Notebook 的 2 个单元格中运行它。

查看 SPARK UI,它使用相同的种子两次。为什么?确定性方面,似乎与我们所学的“行动”概念不一致。

df.show(truncate=False)
dataframe apache-spark pyspark apache-spark-sql databricks
2个回答
1
投票

这是每个分区仅随机初始化一次的结果。 来源

因此,如果分区布局未更改,则在每次连续执行中都会使用相同的初始种子,因此

rand
会生成相同的序列。

当分区不固定时,

rand
的行为变得不确定,这通过Spark JIRA-13380进行了记录(至少在源代码中)。


0
投票

不确定您是否仍然遇到问题,但找到了一篇不错的博客文章。所以基本上他不是用 PRNG 创建 RN,而是对排序数据帧的行号使用哈希函数。这确保了生成的随机数是可再现的和确定性的。正如他提到的,这种方法在很大程度上取决于哈希算法的质量。根据我的快速而肮脏的实现,whhc 使用spark 2.4.8,这就是为什么我不使用 xxhash 而不是使用 Murmurhash3 算法的 PySpark 的 .hash() 方法。

import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id

def distributed_random_number(indf:pyspark.sql.DataFrame, col_name:str, unique_keys:list) -> pyspark.sql.DataFrame:
    """
    This function creates a new column with a deterministic random number integer. Other implementations in PySpark like rand() are not determnistic! 
    
    Arguments: 
    ----------
    indf:pyspark.sql.DataFrame
    col_name:str
        Column name of the new random number column. 
    unique_keys:list
        Unique keys inside the dataframe, which are needed for sorting in the beginning. 
    
    Returns: 
    ----------
    pyspark.sql.DataFrame
        Input dataframe including the new random number column. 
    """
    if isinstance(unique_keys, str): 
        unique_keys = [unique_keys]
    if not all(col in indf.columns for col in unique_keys) or col_name in indf.columns: 
        raise ValueError(f"Either the given 'col_name' is already in the indf columns or not all columns in 'unique_keys' are in the indf columns.")
    
    return indf.orderBy(unique_keys).\
            withColumn('monotonically_increasing_id_tmp', monotonically_increasing_id()).\
            withColumn(col_name, F.hash('monotonically_increasing_id_tmp')).drop('monotonically_increasing_id_tmp')

df_with_random_col = distributed_random_number(df_without_random_col, 'random_col', 'unique_id')

希望这有帮助! :)

© www.soinside.com 2019 - 2024. All rights reserved.