带有PySpark覆盖项目ID的BigQuery

问题描述 投票:1回答:1

我正在Google Cloud中使用BigQuery和Dataproc。两者都在同一个项目中,我们称之为“ project-123”。我使用Composer(Airflow)运行我的代码。

我有一个简单的python脚本test_script.py,该脚本使用pyspark从bigquery公用数据集中的表中读取数据:

if __name__ == "__main__":
    # Create Spark Cluster
    try:
        spark = SparkSession.builder.appName("test_script").getOrCreate()
        log.info("Created a SparkSession")
    except ValueError:
        warnings.warn("SparkSession already exists in this scope")

    df = (
        spark.read.format("bigquery")
        .option("project", "project-123")
        .option("dataset", "bigquery-public-data")
        .option("table", "crypto_bitcoin.outputs")
        .load()
    )

我使用气流中的DataProcPySparkOperator运行脚本:

    # This task corresponds to the ""
    test_script_task = DataProcPySparkOperator(
        task_id="test_script",
        main="./test_script.py",
        cluster_name="test_script_cluster",
        arguments=[],

        # Since we are using bigquery, we need to explicity add the connector jar
        dataproc_pyspark_jars="gs://spark-lib/bigquery/spark-bigquery-latest.jar",
    )

但是,每次尝试都会出现以下错误:

Invalid project ID '/tmp/test_script_20200304_407da59b/test_script.py'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.

此项目ID来自何处?我的.option("project", "project-123")显然不会覆盖它。我的猜测是Composer将我的spark作业脚本存储在位置/tmp/test_script_20200304_407da59b/test_script.py。如果是这种情况,如何覆盖项目ID?

非常感谢您的帮助

python pyspark google-bigquery google-cloud-dataproc google-cloud-composer
1个回答
0
投票

恐怕您正在混合参数。 project是表所属的项目,bigquery-public-data是项目而不是数据集。请尝试以下电话:

df = (
        spark.read.format("bigquery")
        .option("parentProject", "project-123")
        .option("project", "bigquery-public-data")
        .option("table", "crypto_bitcoin.outputs")
        .load()
    )
© www.soinside.com 2019 - 2024. All rights reserved.