下面生成的随机数按预期每行都不同。如此精细。但我显然在思考中缺少一些基本方面。
from pyspark.sql import functions as F
df = spark.range(10).withColumn("randomNum",F.rand())
df.show(truncate=False)
返回:
+---+-------------------+
|id |randomNum |
+---+-------------------+
|0 |0.8128581612050234 |
|1 |0.40656852491856355|
|2 |0.9444869347865689 |
|3 |0.10391423680687417|
|4 |0.05285485891027453|
|5 |0.5140906081158558 |
|6 |0.900727341820192 |
|7 |0.11046600268909801|
|8 |0.6509183512961298 |
|9 |0.5060097759646045 |
+---+-------------------+
然后再次调用
show()
- 下面的特殊操作,为什么我们再次得到与第一轮相同的随机数序列? show()
是否在某些方面重写了传统的 Action 方法,因为它认为它是相同的 DF?如果是这样,则不适用于 pyspark 等中的所有方法。我在 Databricks Notebook 的 2 个单元格中运行它。
查看 SPARK UI,它使用相同的种子两次。为什么?确定性方面,似乎与我们所学的“行动”概念不一致。
df.show(truncate=False)
这是每个分区仅随机初始化一次的结果。 来源。
因此,如果分区布局未更改,则在每次连续执行中都会使用相同的初始种子,因此
rand
会生成相同的序列。
当分区不固定时,
rand
的行为变得不确定,这通过Spark JIRA-13380进行了记录(至少在源代码中)。
不确定您是否仍然遇到问题,但找到了一篇不错的博客文章。所以基本上他不是用 PRNG 创建 RN,而是对排序数据帧的行号使用哈希函数。这确保了生成的随机数是可再现的和确定性的。正如他提到的,这种方法在很大程度上取决于哈希算法的质量。根据我的快速而肮脏的实现,whhc 使用spark 2.4.8,这就是为什么我不使用 xxhash 而不是使用 Murmurhash3 算法的 PySpark 的 .hash() 方法。
import pyspark.sql.functions as F
from pyspark.sql.functions import monotonically_increasing_id
def distributed_random_number(indf:pyspark.sql.DataFrame, col_name:str, unique_keys:list) -> pyspark.sql.DataFrame:
"""
This function creates a new column with a deterministic random number integer. Other implementations in PySpark like rand() are not determnistic!
Arguments:
----------
indf:pyspark.sql.DataFrame
col_name:str
Column name of the new random number column.
unique_keys:list
Unique keys inside the dataframe, which are needed for sorting in the beginning.
Returns:
----------
pyspark.sql.DataFrame
Input dataframe including the new random number column.
"""
if isinstance(unique_keys, str):
unique_keys = [unique_keys]
if not all(col in indf.columns for col in unique_keys) or col_name in indf.columns:
raise ValueError(f"Either the given 'col_name' is already in the indf columns or not all columns in 'unique_keys' are in the indf columns.")
return indf.orderBy(unique_keys).\
withColumn('monotonically_increasing_id_tmp', monotonically_increasing_id()).\
withColumn(col_name, F.hash('monotonically_increasing_id_tmp')).drop('monotonically_increasing_id_tmp')
df_with_random_col = distributed_random_number(df_without_random_col, 'random_col', 'unique_id')
希望这有帮助! :)