我在 Bigquery 中运行以下 PySpark 存储过程;
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("work_with_sql").getOrCreate()
df = spark.sql(
'''
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2024
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
'''
)
df.show()
spark.stop()
但它在日志记录中说;
pyspark.sql.utils.AnalysisException: Table or view not found: `bigquery-public-data.stackoverflow.posts_questions`; line 5 pos 9;
如何在 PySpark 存储过程中使用 Spark.sql?
spark.sql
用于在Spark上下文中查询数据。要从 BigQuery 查询加载数据,您可以使用:
spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")
sql = """
SELECT tag, COUNT(*) c
FROM (
SELECT SPLIT(tags, '|') tags
FROM `bigquery-public-data.stackoverflow.posts_questions` a
WHERE EXTRACT(YEAR FROM creation_date)>=2014
), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
"""
df = spark.read.format("bigquery").load(sql)
df.show()
这样的代码可以工作:
def main():
sql = 'select date,ctr from `{PROJECT_NAME}.{DATA_SET_NAME}.{TABLE_NAE}`'
df = spark.read \
.format("bigquery") \
.load(sql)
df.show()
# 停止 SparkSession
spark.stop()
if __name__ == "__main__":
from pyspark import SparkConf
from pyspark.sql import SparkSession
# 初始化spark框架
conf = SparkConf(). \
setAppName(f"spark-bq"). \
set("parentProject", '{PROJECT_NAME}'). \
set("credentialsFile", '/home/admin/tso_test/truemetrics-lilysilk-cdp-3efdaf581a57.json'). \
set("materializationDataset", "{DATA_SET_NAME}"). \
set("viewsEnabled", True)
spark = SparkSession.builder.enableHiveSupport(). \
config(conf=conf).getOrCreate()
main()