spark-submit 作业出现问题,当我使用此脚本运行
spark-submit --jars /path-to-this/postgresql-42.7.1.jar /path-to-this/large-scale-data-processing/src/etl/load.py
时,
import os
from dotenv import load_dotenv
from spark_handler import SparkHandler
class DataLoader(object):
def __init__(self):
load_dotenv()
# Register PostgreSQL JDBC driver
self.spark = SparkHandler.create_session()
self.jdbc_url = "jdbc:postgresql://127.0.0.1:5432/taxi_data"
self.table_name = "taxi_trips"
self.properties = {
"user_name": "taxi_driver",
"password": "yellowcab"
}
def create_table(self):
query = """
CREATE TABLE IF NOT EXISTS taxi_trips (
VendorID INTEGER,
tpep_pickup_datetime TIMESTAMP,
tpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance DECIMAL,
RatecodeID INTEGER,
store_and_fwd_flag VARCHAR(1),
PULocationID INTEGER,
DOLocationID INTEGER,
payment_type INTEGER,
fare_amount DECIMAL,
extra DECIMAL,
mta_tax DECIMAL,
tip_amount DECIMAL,
tolls_amount DECIMAL,
improvement_surcharge DECIMAL,
total_amount DECIMAL,
congestion_surcharge DECIMAL,
Airport_fee DECIMAL,
company_name VARCHAR(255)
);
"""
(
self.spark
.sql(query)
.write
.format("jdbc")
.option("url", self.jdbc_url)
.option("dbtable", "taxi_data.taxi_trips")
.option("user", self.properties["user_name"])
.option("password", self.properties["password"])
.save()
)
def load_data_into_dw(self):
df = self.spark.read.parquet(os.getenv("DATA_PATH") \
+ "/large-scale-data-processing/data/output/joined_table_a")
(
df.write.format("jdbc")
.option("url", self.jdbc_url)
.option("dbtable", "taxi_data.taxi_trips")
.option("user", self.properties.get('user_name'))
.option("password", self.properties.get('password'))
.save()
)
print("Data loaded to dw!")
if __name__ == "__main__":
dl = DataLoader()
dl.create_table()
dl.load_data_into_dw()
from pyspark.sql import SparkSession
class SparkHandler(object):
"""
Class takes care of starting up sparksesh
"""
spark: SparkSession = None
@classmethod
def create_session(cls) -> SparkSession:
if cls.spark is None:
cls.spark = (
SparkSession
.builder
.appName("my_spark_app")
.config("spark.sql.catalogImplementation", "hive")
.config("spark.driver.userClassPathFirst", "true")
.getOrCreate()
)
return cls.spark
终端的输出是没有合适的驱动程序。
py4j.protocol.Py4JJavaError: An error occurred while calling o41.save.
: java.sql.SQLException: No suitable driver
at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.
...
...
...
我尝试过几种不同的方式来调试它:
检查了明显的“合适的驱动程序”,但我有java 11并且正在运行postgres 16.1,我使用的驱动程序是postgresql-42.7.1
我已经检查了代码中的错误,但网址、密码、用户名等似乎都是正确的。
我查找了所有防火墙并查看了 psql conf 文件是否存在任何阻塞,但没有发现任何阻塞。
我期望简单地将一些数据加载到 psql 中来测试我编写的一些 Spark 代码。任何帮助将不胜感激。干杯!
已解决:
阅读此文档:https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
出于某种奇怪的原因,我尝试的 postgres 的 JDBC 版本无法正常工作,尽管我的 Java 安装和 Postgres 的版本似乎与 42.7.1 匹配。所以,然后,我去了 Maven 存储库,找到了 Spark 文档中指定的版本,结果它可以工作了。