我已动态地将列列表(超过 100 个)添加到数据框中。带有负数的列是根据 bookID 列的值生成的列。在每一行中,只有一个动态列的值为 1,其余列将为 0。我需要合并具有相同前 3 列的行。列 bookId 需要聚合以包含设置为 1 且该聚合正在运行的列的值。动态库需要以这样的方式进行聚合:如果任何行中的任何动态列设置为1,则将其设置为1,否则设置为0。如何动态聚合列的总和?
columns = ["name", "last", "address", "bookID"]
books = [-23, -44, -97, -32, -57, -76]
| name| last| address|bookID|-23|-44|-97|-32|-57|-76|
+----------+---------+--------+------+---+---+---+---+---+---+
|firstname1|lastname1|address1| -97| 0| 0| 1| 0| 0| 0|
|firstname1|lastname1|address1| -23| 1| 0| 0| 0| 0| 0|
|firstname2|lastname2|address2| -23| 1| 0| 0| 0| 0| 0|
|firstname2|lastname2|address2| -76| 0| 0| 0| 0| 0| 1|
预期结果:
| name| last| address|bookID_values|-23|-44|-97|-32|-57|-76|
+----------+---------+--------+-------------+---+---+---+---+---+---+
|firstname1|lastname1|address1| [-97, -23]| 1| 0| 1| 0| 0| 0|
|firstname2|lastname2|address2| [-23, -76]| 1| 0| 0| 0| 0| 1|
我正在创建一个语句数组并将它们作为参数传递给 agg 函数,但我收到 [UNRESOLVED_COLUMN_WITH_SUGGESTION] 错误。
agg_exps= [collect_list("bookID").alias("bookID_values"),]
s = ""
for book in books:
column_name = str(book)
s=f"(sum('{column_name}' == 1, lit(1).otherwise(lit(0)))).alias('{column_name}')"
agg_exps.append(col(s))
agg_df = df.groupBy("name", "last", "address").agg(*agg_exps)
我还尝试过其他方法,例如:
s=f"when(sum('{column_name}' == 1, lit(1).otherwise(lit(0)))).alias('{column_name}')"
让我们修复您的代码:
agg_exps = [
F.collect_list('bookID').alias('bookID'),
*[F.max(str(book)).alias(str(book)) for book in books]
]
df1 = df.groupBy("name", "last", "address").agg(*agg_exps)
df1.show()
+----------+---------+--------+----------+---+---+---+---+---+---+
| name| last| address| bookID|-23|-44|-97|-32|-57|-76|
+----------+---------+--------+----------+---+---+---+---+---+---+
|firstname1|lastname1|address1|[-97, -23]| 1| 0| 1| 0| 0| 0|
|firstname2|lastname2|address2|[-23, -76]| 1| 0| 0| 0| 0| 1|
+----------+---------+--------+----------+---+---+---+---+---+---+