我有一个 PySpark DataFrame -
valuesCol = [('Sweden',31),('Norway',62),('Iceland',13),('Finland',24),('Denmark',52)]
df = sqlContext.createDataFrame(valuesCol,['name','id'])
+-------+---+
| name| id|
+-------+---+
| Sweden| 31|
| Norway| 62|
|Iceland| 13|
|Finland| 24|
|Denmark| 52|
+-------+---+
我想在这个DataFrame中添加一个行列,即该行的行号(序列号),如下图-
我的最终输出应该是:
+-------+---+--------+
| name| id|row_num |
+-------+---+--------+
| Sweden| 31| 1|
| Norway| 62| 2|
|Iceland| 13| 3|
|Finland| 24| 4|
|Denmark| 52| 5|
+-------+---+--------+
我的 Spark 版本是
2.2
我正在尝试这段代码,但它不起作用-
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window
w = Window().orderBy()
df = df.withColumn("row_num", row_number().over(w))
df.show()
我收到错误:
AnalysisException: 'Window function row_number() requires window to be ordered, please add ORDER BY clause. For example SELECT row_number()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table;'
如果我理解正确,我需要订购一些列,但我不想要这样的东西
w = Window().orderBy('id')
,因为这会重新排序整个DataFrame。
任何人都可以建议如何使用
row_number()
函数实现上述输出吗?
您应该为 order 子句定义列。如果您不需要对值进行排序,则编写一个虚拟值。尝试下面;
from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().orderBy(lit('A'))
df = df.withColumn("row_num", row_number().over(w))
我遇到了类似的问题,但就我而言,@Ali Yesilli 的解决方案失败了,因为我分别读取多个输入文件并最终将它们全部合并到一个数据框中。在这种情况下,由虚拟变量排序的窗口内的顺序被证明是不可预测的。
monotonically_increasing_id
:
df = df.withColumn('original_order', monotonically_increasing_id())
df = df.withColumn('row_num', row_number().over(Window.orderBy('original_order')))
df = df.drop('original_order')
my_data_df.createOrReplaceTempView("my_data")
my_data_indexed_df = spark.sql("select row_number() over (order by (select null)) as row_num,* from my_data")