我正在写一个小poc,试图将用python编写的一段逻辑重写到pyspark,其中我一一处理存储在sqlite中的日志:
logs = [...]
processed_logs = []
previous_log = EmptyDecoratedLog() #empty
for log in logs:
processed_log = with_outlet_value_closed(log, previous_log)
previous_log = processed_log
processed_logs.append(processed_log)
和
def with_outlet_value_closed(current_entry: DecoratedLog, previous_entry: DecoratedLog):
if current_entry.sourceName == "GS2":
self.outletValveClosed = current_entry.eventData
else:
self.outletValveClosed = previous_entry.outletValveClosed
我想在 pyspark api 中表示为:
import pyspark.sql.functions as f
window = W.orderBy("ID") #where ID is unique id on those logs
df.withColumn("testValveOpened",
f.when((f.col("sourceName") == "GS2"), f.col("eventData"))
.otherwise(f.lag("testValveOpened").over(window)),
)
但这会导致 AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] 无法解析名称为
outletValveClosed
的列或函数参数。
所以我的问题是: 是否可以表示这样的代码,其中当前行的值取决于同一列的前一行(我知道这将导致所有记录在单个线程上处理,但这很好)
我尝试添加列的初始化
df = df.withColumn("testValveOpened", f.lit(0))
df.withColumn("testValveOpened",
f.when((f.col("sourceName") == "GS2"), f.col("eventData"))
.otherwise(f.lag("testValveOpened").over(window)),
)
但后来我得到了
ID |sourceName|eventData|testValveOpened
1 |GS3 |1 |0
2 |GS2 |1 |1
3 |GS2 |1 |1
4 |GS1 |1 |0
5 |GS1 |1 |0
6 |ABC |0 |0
7 |B123 |0 |0
8 |B423 |0 |0
9 |PTSD |168 |0
10 |XCD |0 |0
我想得到
ID |sourceName|eventData|testValveOpened
1 |GS3 |1 |0
2 |GS2 |1 |1
3 |GS2 |1 |1
4 |GS1 |1 |1
5 |GS1 |1 |1
6 |ABC |0 |1
7 |B123 |0 |1
8 |B423 |0 |1
9 |PTSD |168 |1
10 |XCD |0 |1
因此,当有 GS2 时,取 eventData 的值,否则取之前 testValueOpened 的 cary 值
您必须稍微重写逻辑,因为您无法“逐一”更新每一行。首先检查 HS2:
df.withColumn("testValveOpened", f.when(f.col("sourceName" == "GS2"), f.lit(1)).otherwise(0))
然后进行累计和比较,看看之前是否存在 GS2:
df.withColumn("testValveOpened", f.sum("testValveOpened").over(window) > 1)