我正在尝试在 google dataproc 集群上使用 py-spark 来运行 Spark 作业并将结果写入大查询表。
Spark Bigquery 连接器文档 - https://github.com/GoogleCloudDataproc/spark-bigquery-connector
要求是在创建表的过程中,大查询表上应该存在某些标签。
spark bq 连接器不提供任何为写入操作添加标签的规定
df.write.format("bigquery") \
.mode("overwrite") \
.option("temporaryGcsBucket", "tempdataprocbqpath") \
.option("createDisposition", "CREATE_IF_NEEDED") \
.save("abc.tg_dataset_1.test_table_with_labels")
上面的命令在后台创建 bigquery 加载作业,该作业加载带有数据的表。 进一步检查后发现,与大查询 - 查询作业相比,大查询加载作业语法本身不支持添加标签。
有没有计划支持以下
由于没有在加载/写入操作期间添加标签的规定,当前使用的解决方法是在 pyspark 作业之前使用架构/标签创建表
问题发布于 10 个月前。我不确定它当时是否可用或最近添加,但相同的 github 文档提供了添加标签的选项:-
bigQueryTableLabel :- 可用于在写入表时向表添加标签。可以设置多个标签。 (可选)
因此您可以在代码中添加类似的内容:-
spark.conf.set("bigQueryTableLabel.name", "sample_table_name")
这个问题很久以前就已经发布了。但是用以下信息回答。它将帮助开发人员获得每个 Spark 作业的插槽使用情况。
我编写了下面的一段代码来捕获每个 Spark 作业的 bigquery 槽使用情况。 您可以使用下面的补丁,其中您通过 Spark 作业执行查询。
def getDataprocJobId(spark: SparkSession): String = {
spark.sparkContext.getConf.get(KEY_SPARK_YARN_TAGS, UNKNOWN).split(',')
.find(_.startsWith(DATAPROC_JOB_PREFIX))
.map(_.substring(DATAPROC_JOB_PREFIX.length))
.getOrElse(spark.sparkContext.appName)
}
val bld = QueryJobConfiguration.newBuilder(sql)
val dataprocjobid: String = getDataprocJobId(spark)
var dataproc_job_id: String = {
if (dataprocjobid.startsWith("exp")) {
val pattern= "exp\\-[a-zA-Z0-9-](.*)(?=-EXPORT)".r
pattern.findFirstIn(dataprocjobid).toString
}
else {
dataprocjobid
}
}
import scala.collection.JavaConverters._
bld.setLabels(Map("dataproc_job_id" -> dataproc_job_id.toLowerCase).asJava)
**Its working fine and I can see the labels inside information_schema table.**
select
query
,TIMESTAMP_DIFF(start_time, end_time, MINUTE) as diff
,total_bytes_processed
,total_slot_ms
,cache_hit
,lbl.value as dataproc_job_id
from `region-eu.INFORMATION_SCHEMA.JOBS_BY_PROJECT` ,unnest(labels) lbl
where date(creation_time) >='2024-04-19'
and lbl.key='dataproc_job_id' limit 10;
但是,它也有一定的局限性。
1)Label keys and values can be no longer than 63 characters
2)can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed.
3)Label keys must start with a letter and each label in the list must have a different key.
对于我们的系统来说,很少有 Spark 作业的作业 ID 超过 63 的限制,因此我们使用正则表达式从中获取有意义的信息。