PySpark 作业找不到“合适的驱动程序”

问题描述 投票:0回答:1

spark-submit 作业出现问题,当我使用此脚本运行

spark-submit --jars /path-to-this/postgresql-42.7.1.jar /path-to-this/large-scale-data-processing/src/etl/load.py 
时,

import os
from dotenv import load_dotenv

from spark_handler import SparkHandler

class DataLoader(object):

    def __init__(self):
        load_dotenv()
        # Register PostgreSQL JDBC driver
        self.spark = SparkHandler.create_session()
        self.jdbc_url = "jdbc:postgresql://127.0.0.1:5432/taxi_data"
        self.table_name = "taxi_trips"
        self.properties = {
        "user_name": "taxi_driver",
        "password": "yellowcab"
        }

    def create_table(self):

        query = """
            CREATE TABLE IF NOT EXISTS taxi_trips (
                VendorID INTEGER,
                tpep_pickup_datetime TIMESTAMP,
                tpep_dropoff_datetime TIMESTAMP,
                passenger_count INTEGER,
                trip_distance DECIMAL,
                RatecodeID INTEGER,
                store_and_fwd_flag VARCHAR(1),
                PULocationID INTEGER,
                DOLocationID INTEGER,
                payment_type INTEGER,
                fare_amount DECIMAL,
                extra DECIMAL,
                mta_tax DECIMAL,
                tip_amount DECIMAL,
                tolls_amount DECIMAL,
                improvement_surcharge DECIMAL,
                total_amount DECIMAL,
                congestion_surcharge DECIMAL,
                Airport_fee DECIMAL,
                company_name VARCHAR(255)
            );
        """

        (

        self.spark
        .sql(query)
        .write
        .format("jdbc")
        .option("url", self.jdbc_url)
        .option("dbtable", "taxi_data.taxi_trips")
        .option("user", self.properties["user_name"])
        .option("password", self.properties["password"])
        .save()
        )

    def load_data_into_dw(self):
        df = self.spark.read.parquet(os.getenv("DATA_PATH") \
            + "/large-scale-data-processing/data/output/joined_table_a")

        (
        df.write.format("jdbc")
        .option("url", self.jdbc_url)
        .option("dbtable", "taxi_data.taxi_trips")
        .option("user", self.properties.get('user_name'))
        .option("password", self.properties.get('password'))
        .save()
        )

        print("Data loaded to dw!")

if __name__ == "__main__":
    dl = DataLoader()
    dl.create_table()
    dl.load_data_into_dw()
from pyspark.sql import SparkSession

class SparkHandler(object):
    """
    Class takes care of starting up sparksesh
    """
    spark: SparkSession = None

    @classmethod
    def create_session(cls) -> SparkSession:

        if cls.spark is None:
            cls.spark = (
                SparkSession
                .builder
                .appName("my_spark_app")
                .config("spark.sql.catalogImplementation", "hive")
                .config("spark.driver.userClassPathFirst", "true")
                .getOrCreate()
                )
        return cls.spark

终端的输出是没有合适的驱动程序。

py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.sql.SQLException: No suitable driver
    at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.
        ...
        ...
        ...

我尝试过几种不同的方式来调试它:

  1. 检查了明显的“合适的驱动程序”,但我有java 11并且正在运行postgres 16.1,我使用的驱动程序是postgresql-42.7.1

  2. 我已经检查了代码中的错误,但网址、密码、用户名等似乎都是正确的。

  3. 我查找了所有防火墙并查看了 psql conf 文件是否存在任何阻塞,但没有发现任何阻塞。

我期望简单地将一些数据加载到 psql 中来测试我编写的一些 Spark 代码。任何帮助将不胜感激。干杯!

exception pyspark jdbc postgresql-16
1个回答
0
投票

已解决:

阅读此文档:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

出于某种奇怪的原因,我尝试的 postgres 的 JDBC 版本无法正常工作,尽管我的 Java 安装和 Postgres 的版本似乎与 42.7.1 匹配。所以,然后,我去了 Maven 存储库,找到了 Spark 文档中指定的版本,结果它可以工作了。

© www.soinside.com 2019 - 2024. All rights reserved.