使用 Spark BQ 连接器从 Dataproc 上的 Pyspark 作业向大查询表添加标签

问题描述 投票:0回答:2

我正在尝试在 google dataproc 集群上使用 py-spark 来运行 Spark 作业并将结果写入大查询表。

Spark Bigquery 连接器文档 - https://github.com/GoogleCloudDataproc/spark-bigquery-connector

要求是在创建表的过程中,大查询表上应该存在某些标签。

spark bq 连接器不提供任何为写入操作添加标签的规定

df.write.format("bigquery") \
    .mode("overwrite") \
    .option("temporaryGcsBucket", "tempdataprocbqpath") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .save("abc.tg_dataset_1.test_table_with_labels")

上面的命令在后台创建 bigquery 加载作业,该作业加载带有数据的表。 进一步检查后发现,与大查询 - 查询作业相比,大查询加载作业语法本身不支持添加标签。

有没有计划支持以下

  1. 支持大查询加载作业中的标签
  2. 支持spark bq连接器写入操作中的标签。

由于没有在加载/写入操作期间添加标签的规定,当前使用的解决方法是在 pyspark 作业之前使用架构/标签创建表

google-bigquery google-cloud-dataproc
2个回答
0
投票

问题发布于 10 个月前。我不确定它当时是否可用或最近添加,但相同的 github 文档提供了添加标签的选项:-

bigQueryTableLabel :- 可用于在写入表时向表添加标签。可以设置多个标签。 (可选)

因此您可以在代码中添加类似的内容:-

spark.conf.set("bigQueryTableLabel.name", "sample_table_name")


0
投票

这个问题很久以前就已经发布了。但是用以下信息回答。它将帮助开发人员获得每个 Spark 作业的插槽使用情况。

我编写了下面的一段代码来捕获每个 Spark 作业的 bigquery 槽使用情况。 您可以使用下面的补丁,其中您通过 Spark 作业执行查询。

 def getDataprocJobId(spark: SparkSession): String = {
    spark.sparkContext.getConf.get(KEY_SPARK_YARN_TAGS, UNKNOWN).split(',')
      .find(_.startsWith(DATAPROC_JOB_PREFIX))
      .map(_.substring(DATAPROC_JOB_PREFIX.length))
      .getOrElse(spark.sparkContext.appName)
  }

val bld = QueryJobConfiguration.newBuilder(sql)
val dataprocjobid: String = getDataprocJobId(spark)
  var dataproc_job_id: String = {
    if (dataprocjobid.startsWith("exp")) {
      val pattern= "exp\\-[a-zA-Z0-9-](.*)(?=-EXPORT)".r
      pattern.findFirstIn(dataprocjobid).toString
    }
    else {
      dataprocjobid
    }
  }

import scala.collection.JavaConverters._
bld.setLabels(Map("dataproc_job_id" -> dataproc_job_id.toLowerCase).asJava)

**Its working fine and I can see the labels inside information_schema table.**

select
 query
,TIMESTAMP_DIFF(start_time, end_time, MINUTE) as diff
,total_bytes_processed
,total_slot_ms
,cache_hit
,lbl.value as dataproc_job_id
from `region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT` ,unnest(labels) lbl
where date(creation_time) >='2024-04-19'
and lbl.key='dataproc_job_id' limit 10;

但是,它也有一定的局限性。

1)Label keys and values can be no longer than 63 characters
2)can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. 
3)Label keys must start with a letter and each label in the list must have a different key.

对于我们的系统来说,很少有 Spark 作业的作业 ID 超过 63 的限制,因此我们使用正则表达式从中获取有意义的信息。

© www.soinside.com 2019 - 2024. All rights reserved.