解决方案基于这篇文章:https://www.redhat.com/en/blog/why-spark-ceph-part-1-3
设置
到目前为止,我已经在远程服务器(服务器 A)上设置了一个 Ceph 集群,并复制了一些 sqlite3 文件。 我还在另一台远程服务器(服务器 B)上设置了一个独立的 Spark 集群。
由于公司安全政策(防火墙等),我无法直接从本地机器运行 Spark 作业,所以我不得不在 linux 服务器(我们称之为服务器 C)上设置远程开发环境。
所以我可以使用我的 IDE 从我的本地笔记本电脑运行代码,代码在服务器 C 上自动同步和执行,并将作业发送到 Spark 集群(服务器 B)——所有这些工作得很好.
本场景数据位于执行代码的服务器C上
问题
现在,我不想查询代码所在的数据(服务器 C),而是想读取存储在 CEPH(服务器 A)上的文件——或者更准确地说,我想使用 Spark 远程查询 sqlite3 文件。
错误
注:
py4j.protocol.Py4JJavaError: An error occurred while calling o51.load. : java.sql.SQLException: path to 'ceph://<access_key_id>:<access_key>@<hostname>/<bucket>/<path>/<filename>.sqlite': '/tmp/pycharm_project_162/ceph:' does not exist
代码
import os
from pyspark.sql import SparkSession
print("+++++ BUILDING SPARK SESSION +++++")
spark = SparkSession.builder \
.appName("Load SQLite file") \
.master("spark://<spark_host_on_server_B>:<port>") \
.config("spark.jars", "{}/sqlite-jdbc-3.41.0.0.jar".format(os.getcwd())) \
.config('spark.driver.extraClassPath', "{}/sqlite-jdbc-3.41.0.0.jar".format(os.getcwd())) \
.config('spark.executor.extraClassPath', "{}/sqlite-jdbc-3.41.0.0.jar".format(os.getcwd())) \
.config("spark.shuffle.service.enabled", "false") \
.config("spark.dynamicAllocation.enabled", "false") \
.getOrCreate()
print("+++++ CONNECTING TO SQLLITE +++++")
df = spark.read.format("jdbc") \
.option("url", "jdbc:sqlite:ceph://<access_key_id>:<access_key>@<hostname>/<bucket>/<path>/<filename>.sqlite") \
.option("driver", "org.sqlite.JDBC") \
.option("dbtable", "data") \
.load()
print("+++++ DISPLAYING DATA +++++")
df.show()
结论
我已经尝试过各种不同的方法来做到这一点,但没有成功。我在这里只发布了一个代码示例。 sqlite3 JDBC 也存在于“jars/”下的 Spark 安装中(here) 这是连接到 Ceph 集群的正确语法吗?还有其他提示吗?例如。主机名应该包含“https://”吗?
我会根据需要发布更多详细信息,请随时提出任何要求。
您引用的文章讨论了使用 Ceph 作为 Spark 集群的存储后端,但是您的代码表明您正在尝试使用 SQLite 文件作为现有 Spark 集群的输入。
Ceph 不是协议;相反,Ceph 支持许多协议,例如 RBD、HTTP[S] 等……如果您只有一台服务器,Ceph 就不太适合,因为您没有任何持久性或冗余。 Ceph 被设计为至少使用 3 台服务器,通常使用几十台服务器。如果你需要一个 S3 兼容的 API(即对象存储),你可以使用像 Minio 这样的东西。
如果您打算使用 Ceph,根据您的目标,您可能希望使用 RBD 装载数据,使用 Ceph 的对象存储(又名 S3/HTTPS)API,或者如果您需要锁定,您可能想尝试一下 libcephsqlite对于有警告的多个客户:
数据库可以由多个客户端安全地操作,仅以串行方式由 Ceph SQLite VFS 管理的 RADOS 锁控制。
如果您要有多个客户端访问,您可能需要重新考虑 SQLite 的选择。使用,因为只要另一个客户端访问数据,客户端就会被阻止。
问题最终是 Spark 集群上版本错误的 .jar。
我按照本指南解决了我的问题: https://bigdatagurus.wordpress.com/2021/08/02/include-aws-s3-libraries-in-pyspark/