使用Databricks通用集群获取spark作业名称

问题描述 投票:0回答:1

我正在构建从各种远程系统收集数据到中央 Spark 集群的系统。我正在使用 Delta 格式保存本地数据。

收集后,我想从 Delta 的日志中获取收集的行数。 我正在使用这段代码:

def __get_job_name(self) -> str:
    all_cluster_tags = {}
    for tag in json.loads(
        self.spark.conf.get("spark.databricks.clusterUsageTags.clusterAllTags")
    ):
        all_cluster_tags[tag["key"]] = tag["value"]

    return all_cluster_tags.get("RunName")

def __extract_delta_operation_data(self) -> tuple[int, int]:
    delta_table = DeltaTable.forPath(self.spark, self.target_table_path)
    history_df = delta_table.history()
    count, bytes_size = 0, 0

    if not is_local_spark():
        curr_job_name = self.__get_job_name()

        relevant_op_data_row = (
            history_df.filter(history_df["job"]["jobName"] == curr_job_name)
            .filter(history_df["operation"].isin(["STREAMING UPDATE", "WRITE"]))
            .sort(history_df.timestamp.desc())
            .collect()
        )
        if relevant_op_data_row:
            last_op_data_row = relevant_op_data_row[0]
            operation_metrics = last_op_data_row[12]
            count = operation_metrics.get("numOutputRows", 0)
            bytes_size = operation_metrics.get("numOutputBytes", 0)

    return count, bytes_size

在作业集群中运行收集作业时,此代码非常有效。 当我在通用集群上运行此函数时,该函数返回 0,0,因为函数

__get_job_name
返回
None

知道如何从通用集群中提取作业名称吗?

我知道此操作存在此 jobName(在通用集群上执行),因为当我为此表运行

describe history
时,我正在获取带有 jobName 的数据。

谢谢

apache-spark databricks
1个回答
0
投票

我们使用将值引用发送到作业的参数解决了这个问题。

https://docs.databricks.com/en/workflows/jobs/parameter-value-references.html#supported-value-references

请注意,Databricks 中有一个错误,这应该可以工作,但不能:

{{job.id}}

我们正在使用已弃用的值:

{{job_id}}

© www.soinside.com 2019 - 2024. All rights reserved.