我正在使用 Palantir Workbook 从现有数据集构建新数据集。其中一个每个患者有 1 到 10 行。我的目标是循环遍历每个患者行并应用逻辑来识别具有不同详细信息的重复行或根据先前的行确定状态。最终,我想将最终结果输出到新的数据集,然后继续处理下一个患者行。
作为一名刚接触 Palantir (Spark SQL/Python) 的 MS SQL 开发人员,我正在努力寻找正确的方法。我最初的想法是使用 Python 代码,但我很难找到工作脚本来迭代行。不过,我愿意接受替代解决方案,例如在 SQL 中使用某种类型的游标(如果更合适的话)。
有人可以提供一个关于如何循环访问 Palantir 工作簿中的数据集并生成新数据集作为输出的小代码示例或指导吗?
我尝试使用我发现的以下脚本获得一个非常基本的开始:
def unnamed_2(SourceDataSet):
import pandas as pandas
patient_icn_values = []
for index, row in SourceDataSet.iterrows():
patient_icn_values.append(row['patient_icn'])
my_output = pd.DataFrame({'patient_icn': patient_icn_values})
return my_output
new_dataset = unnamed_2(SourceDataSet)
这是错误:
缺少变换属性
DataFrame 对象没有属性 iterrows。请检查对象的拼写和/或数据类型。
在不了解您的确切情况和需求的情况下,我的最初反应是建议避免对行进行迭代,而是尝试内置的 pyspark 功能。 Foundry 文档有有几个段落涉及到这一点。
以下是您可以在代码工作簿中运行的一些代码示例,这些示例可能有助于根据数据子集(例如每个患者)进行重复数据删除和计算新值。我在底部提供了示例数据,您可以将其作为数据集上传到 Foundry,然后在您的工作簿中使用。您可以在“日志”选项卡中查看示例输出:
def deduplicate_data(notional_patient_example):
from pyspark.sql import functions as F
from pyspark.sql import Window as W
# https://stackoverflow.com/questions/76757625/palantir-workbook-looping-through-a-dataset-and-creating-a-new-dataset-with-py
####################
# Check the Logs tab to see the output from the examples
####################
input_df = notional_patient_example
# https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html
print("Dropping duplicate rows, taking into consideration only select columns")
input_df.dropDuplicates(["patient_id", "visit_ts", "procedure_name"]).show()
print(
"Dropping duplicate rows, taking into consideration only select columns + some new column we compute (in this case, I'm just getting the month but you could imagine something more complicated here)"
)
input_df.withColumn("visit_month", F.month("visit_ts")).dropDuplicates(
["patient_id", "visit_month", "provider_name"]
).show()
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html
# https://www.palantir.com/docs/foundry/transforms-python/pyspark-window/index.html
print(
"Computing a new value based on the order of patient visits for a single patient. You could then dropDuplicates based on this new value."
)
window = (
W()
.partitionBy("patient_id")
.orderBy("visit_ts")
.rowsBetween(W.currentRow, W.unboundedFollowing)
)
input_df.withColumn(
"is_latest_visit", F.max("visit_ts").over(window) == F.col("visit_ts")
).show()
print(
"Or, you could find potential bad data by filtering to patients that have two 'most recent visits'. I'll leave it as an exercise to the reader to do this for Patient 3 who had an invalid procedure_Y on 2023-07-22."
)
input_df.withColumn(
"is_latest_visit", F.max("visit_ts").over(window) == F.col("visit_ts")
).withColumn(
"has_multiple_latest_visits",
F.sum(F.col("is_latest_visit").cast("integer")).over(window) > 1,
).filter(
F.col("has_multiple_latest_visits") == True
).show()
# Just to show how to output a dataset
output_df = input_df.dropDuplicates(["patient_id", "visit_ts", "procedure_name"])
return output_df
患者_id | 访问_ts | 患者姓名 | 程序名称 | 提供商名称 |
---|---|---|---|---|
1 | 7 月 19 日 | A | 程序_1 | 博士。 E |
1 | 7 月 19 日 | A | 程序_2 | 博士。 F |
1 | 7 月 21 日 | A | 程序_3 | 博士。 G |
1 | 7 月 21 日 | A | 程序_3 | 博士。 G |
2 | 7 月 19 日 | B | 程序_2 | 博士。 |
2 | 7 月 20 日 | B | 程序_2 | 博士。 |
2 | 7 月 21 日 | B | 程序_5 | 博士。 J |
2 | 7 月 22 日 | B | 程序_3 | 博士。 G |
2 | 7 月 23 日 | B | 程序_3 | 博士。 F |
3 | 7 月 22 日 | C | 程序_Y | 博士。 J |
3 | 7 月 23 日 | C | 程序_4 | 博士。 F |
4 | 7 月 19 日 | D | 程序_5 | 博士。 |
4 | 7 月 20 日 | D | 程序_5 | 博士。 J |
4 | 7 月 21 日 | D | 程序_5 | 博士。 G |
4 | 7 月 22 日 | D | 程序_2 | 博士。 J |
4 | 7 月 23 日 | D | 程序_1 | 博士。 E |