如何在 BigQuery PySpark 存储过程中运行标准 SQL 查询

问题描述 投票:0回答:2

我在 Bigquery 中运行以下 PySpark 存储过程;

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("work_with_sql").getOrCreate()

df = spark.sql(
'''
SELECT tag, COUNT(*) c
FROM (
     SELECT SPLIT(tags, '|') tags
     FROM `bigquery-public-data.stackoverflow.posts_questions` a
     WHERE EXTRACT(YEAR FROM creation_date)>=2024
     ), UNNEST(tags) tag
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10
'''
)

df.show()
spark.stop()

但它在日志记录中说;

pyspark.sql.utils.AnalysisException: Table or view not found: `bigquery-public-data.stackoverflow.posts_questions`; line 5 pos 9;

如何在 PySpark 存储过程中使用 Spark.sql?

apache-spark pyspark stored-procedures apache-spark-sql google-bigquery
2个回答
0
投票

spark.sql
用于在Spark上下文中查询数据。要从 BigQuery 查询加载数据,您可以使用:

spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset","<dataset>")

sql = """
  SELECT tag, COUNT(*) c
  FROM (
    SELECT SPLIT(tags, '|') tags
    FROM `bigquery-public-data.stackoverflow.posts_questions` a
    WHERE EXTRACT(YEAR FROM creation_date)>=2014
  ), UNNEST(tags) tag
  GROUP BY 1
  ORDER BY 2 DESC
  LIMIT 10
  """
df = spark.read.format("bigquery").load(sql)
df.show()

文档中的更多详细信息:https://github.com/GoogleCloudDataproc/spark-bigquery-connector?tab=readme-ov-file#reading-data-from-a-bigquery-query


0
投票

这样的代码可以工作:

def main():
    sql = 'select date,ctr from `{PROJECT_NAME}.{DATA_SET_NAME}.{TABLE_NAE}`'

    df = spark.read \
        .format("bigquery") \
        .load(sql)
    df.show()
    # 停止 SparkSession
    spark.stop()


if __name__ == "__main__":
    from pyspark import SparkConf
    from pyspark.sql import SparkSession

    # 初始化spark框架
    conf = SparkConf(). \
        setAppName(f"spark-bq"). \
        set("parentProject", '{PROJECT_NAME}'). \
        set("credentialsFile", '/home/admin/tso_test/truemetrics-lilysilk-cdp-3efdaf581a57.json'). \
        set("materializationDataset", "{DATA_SET_NAME}"). \
        set("viewsEnabled", True)

    spark = SparkSession.builder.enableHiveSupport(). \
        config(conf=conf).getOrCreate()

    main()
© www.soinside.com 2019 - 2024. All rights reserved.