我是代码存储库的首次发布者和用户,所以请原谅我的措辞。我正在尝试从充满 RID 的数据集中读取 RID。例如,“backing_dataset_rid”列包含保存在 Foundry 应用程序内的 RID。我正在努力加载 RID 以从该数据集中读取列,并将结果保存回提取 RID 的原始数据集中。我相信这个链接很有帮助,但如果可能的话,我正在 PySpark 中寻找解决方案:(如何在 Palantir Foundry 中联合多个动态输入?)
截至目前,它返回“Transform(myproject.datasets.examples:extract_cols)”列表(隐藏敏感信息)
from pyspark.sql import functions as F
from transforms.api import transform, transform_df, Input, Output
def RID_extract(RID):
@transform_df(
Output('/folder_path/OutputDataset'),
data=Input(RID)
)
def extract_cols(data):
column_names = data.dataframe().columns
return column_names
return extract_cols
@transform_df(
Output("/folder_path/OutputDataset"),
source_df=Input("/folder_path/InputDataset")
)
def compute(source_df):
df = source_df
output_path = "/folder_path/OutputDataset"
rows=df.collect()
df2=[]
print(source_df.columns)
for row in rows:
if row['backing_dataset_rid'] == (None):
continue
RID = row['backing_dataset_rid']
print(RID)
RID_transform = RID_extract(RID)
df2.append(RID_transform)
#df.rdd.map(RID_transform).collect()
print(RID_transform)
return df2
Foundry 中的转换只能有一组静态输入和输出,在运行检查时定义(特别是出于安全原因)。
换句话说:您无法根据您的数据(或转换中无法在提交/检查时推断的任何逻辑)添加新的输入/输出。
因此,您正在寻找的确切行为可能是不可行的。
您需要执行以下操作:
from pyspark.sql import functions as F
from transforms.api import transform, transform_df, Input, Output
def RID_extract(RID):
[...]
return extract_cols
@transform_df(
Output("/folder_path/OutputDataset"),
source_df=Input("/folder_path/InputDataset")
source_df1=Input("RID1"),
source_df2=Input("RID2"),
source_df3=Input("RID3"),
...
source_dfn=Input("RIDn")
)
def compute(source_df, source_df1):
[...]
我认为这会违背你想要实现的目的。您可以使用像here这样的转换生成器,但您的输入集仍然需要是静态的。
替代方案:
注意:任何经过反向工程且未记录的 API 调用都可能会在没有事先警告、通知或更换的情况下中断。执行此操作需要您自担风险,尤其是对于生产工作流程。