使用azure sql数据库时如何解决pyspark中的.load()函数问题

问题描述 投票:0回答:1

我在使用 pyspark 使用 python 从 azure sql 数据库 加载数据时遇到错误。 任何人都可以纠正这个问题。 我的所有连接详细信息都是正确的,因为如果我删除 .load(0 函数,代码就可以工作。因此 pyspark 的 .load() 函数有问题。但是如果我使用像 mysql 这样的本地数据库,.load () 函数起作用并加载完整的数据。

我使用的是pycharm IDE。

在这段代码中,我希望当我运行这段代码时,我应该能够连接到azure sql数据库,然后获取以pyspark数据帧格式给定变量azure_df的表名称中存在的记录

def load_azure(spark, table_name, server, username, password, database):
    source_properties = {
        "driver": 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
        "url": f'jdbc:sqlserver://{server}:1433;database={database};user={username};password={password};',
        "user": username,
        "password": password,
        "dbtable": table_name
    }
    azure_df = spark.read \
        .format("jdbc") \
        .option("url", source_properties["url"]) \
        .option("dbtable", source_properties["dbtable"]) \
        .option("inferSchema", "True")
    azure_df = azure_df.load()
    print('connected before')
    print(azure_df.printSchema())
    print(azure_df.show())
    print(azure_df)
    print('connected')
    return azure_df

这是错误

File "D:\Aryan\python\pythonProject\test.py", line 61, in <module>
    data = load_azure(spark, table_name1, server1, username1, password1, database1)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Aryan\python\pythonProject\test.py", line 48, in load_azure
    azure_df = azure_df.load()
               ^^^^^^^^^^^^^^^
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\sql\readwriter.py", line 314, in load
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py", line 1322, in __call__
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\pyspark.zip\pyspark\errors\exceptions\captured.py", line 179, in deco
  File "C:\Spark\spark-3.5.0-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.sql.SQLException: No suitable driver
    at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:300)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
    at java.base/java.lang.reflect.Method.invoke(Method.java:580)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:1583)

pyspark apache-spark-sql azure-sql-database
1个回答
0
投票
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: java.sql.SQLException: No suitable driver

错误消息表明加载 JDBC 驱动程序时出现问题。当驱动程序未包含在类路径中时,通常会发生这种情况。由于您提到它适用于本地 MySQL 数据库,但不适用于 Azure SQL 数据库,因此可能缺少或未正确配置 Azure SQL 数据库的 JDBC 驱动程序。

下载 SQL Server 的JDBC 驱动并设置类路径,然后您将能够连接到 Azure SQL 数据库,不会出现任何错误,如下所示:

def load_azure(spark, table_name, server, username, password, database):
    source_properties = {
        "driver": 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
        "url": f'jdbc:sqlserver://{server}:1433;database={database};user={username};password={password};',
        "user": username,
        "password": password,
        "dbtable": table_name
    }
    azure_df = spark.read \
        .format("jdbc") \
        .option("url", source_properties["url"]) \
        .option("dbtable", source_properties["dbtable"]) \
        .option("inferSchema", "True") .load()
    #azure_df = azure_df.load()
    print('connected before')
    print(azure_df.printSchema())
    print(azure_df.show())
    print(azure_df)
    print('connected')
    return azure_df
table_name = "student"
server = "<serverName>.database.windows.net"
username = "<userName>"
password = "<password>"
database = "<dbName>"
azure_df = load_azure(spark, table_name, server, username, password, database)

结果:

enter image description here

更多信息可以参考这个

© www.soinside.com 2019 - 2024. All rights reserved.