我正在构建从各种远程系统收集数据到中央 Spark 集群的系统。我正在使用 Delta 格式保存本地数据。
收集后,我想从 Delta 的日志中获取收集的行数。 我正在使用这段代码:
def __get_job_name(self) -> str:
all_cluster_tags = {}
for tag in json.loads(
self.spark.conf.get("spark.databricks.clusterUsageTags.clusterAllTags")
):
all_cluster_tags[tag["key"]] = tag["value"]
return all_cluster_tags.get("RunName")
def __extract_delta_operation_data(self) -> tuple[int, int]:
delta_table = DeltaTable.forPath(self.spark, self.target_table_path)
history_df = delta_table.history()
count, bytes_size = 0, 0
if not is_local_spark():
curr_job_name = self.__get_job_name()
relevant_op_data_row = (
history_df.filter(history_df["job"]["jobName"] == curr_job_name)
.filter(history_df["operation"].isin(["STREAMING UPDATE", "WRITE"]))
.sort(history_df.timestamp.desc())
.collect()
)
if relevant_op_data_row:
last_op_data_row = relevant_op_data_row[0]
operation_metrics = last_op_data_row[12]
count = operation_metrics.get("numOutputRows", 0)
bytes_size = operation_metrics.get("numOutputBytes", 0)
return count, bytes_size
在作业集群中运行收集作业时,此代码非常有效。 当我在通用集群上运行此函数时,该函数返回 0,0,因为函数
__get_job_name
返回 None
。
知道如何从通用集群中提取作业名称吗?
我知道此操作存在此 jobName(在通用集群上执行),因为当我为此表运行
describe history
时,我正在获取带有 jobName 的数据。
谢谢
我们使用将值引用发送到作业的参数解决了这个问题。
请注意,Databricks 中有一个错误,这应该可以工作,但不能:
{{job.id}}
我们正在使用已弃用的值:
{{job_id}}