我觉得我在这里遗漏了一些微不足道的东西,请帮忙。
解决方案基于这些文章:
我在远程服务器(服务器 A)上设置了一个 Ceph 集群并复制了一些 sqlite3 文件。我还在另一台远程服务器(服务器 B)上设置了一个独立的 Spark 集群。
由于公司安全政策(防火墙等),我无法直接从本地机器运行 Spark 作业,所以我不得不在 linux 服务器(我们称之为服务器 C)上设置远程开发环境。
所以我可以使用我的 IDE 从我的本地笔记本电脑运行代码,代码在服务器 C 上自动同步和执行,并将作业发送到 Spark 集群(服务器 B),该集群应该查询服务器 A 上的 Ceph 集群。
由于简单的连接错误,我似乎无法在 Ceph 集群上查询一个简单的 .txt 文件。 如果我尝试对 boto3 做同样的事情,我可以毫无问题地查询 Ceph,但是对于 Spark 它不起作用。
这是有效的简单 boto3 示例:
import boto3
s3 = boto3.client('s3',
endpoint_url='https://ceph-host.com',
aws_access_key_id='my_key',
aws_secret_access_key='my_access_key')
# list objects in bucket
bucket_name = 'bucket_name'
objects = s3.list_objects_v2(Bucket=bucket_name)['Contents']
# print object names
for obj in objects:
print(obj['Key'])
尝试用 Spark 做同样的事情时:
import pyspark
import os
# Ceph authentication
access_key = 'my_key'
secret_key = 'my_secret_key'
ceph_host = 'https://ceph-host.com' # Add your Ceph host here
ceph_port = 6800
# Creating a PySpark session
cwd = os.getcwd()
conf = pyspark.SparkConf() \
.setAppName("1_main") \
.setMaster("spark://master-host.com:port") \
.set("spark.jars", "{}/sqlite-jdbc-3.41.0.0.jar, {}/hadoop-aws-3.3.2.jar, {}/aws-java-sdk-bundle-1.11.1026.jar".format(cwd, cwd, cwd)) \
.set('spark.driver.extraClassPath', "{}/sqlite-jdbc-3.41.0.0.jar, {}/hadoop-aws-3.3.2.jar, {}/aws-java-sdk-bundle-1.11.1026.jar".format(cwd, cwd, cwd)) \
.set('spark.executor.extraClassPath',"{}/sqlite-jdbc-3.41.0.0.jar, {}/hadoop-aws-3.3.2.jar, {}/aws-java-sdk-bundle-1.11.1026.jar".format(cwd, cwd,cwd))
spark = pyspark.SparkContext(conf=conf)
# Adding the Ceph auth to Spark session (I've got too many params as I've tried everything possible from the internet)
hadoopConf = spark._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.access.key", access_key)
hadoopConf.set("fs.s3a.secret.key", secret_key)
hadoopConf.set("fs.s3a.endpoint", "{}:{}".format(ceph_host, ceph_port))
hadoopConf.set("fs.s3a.connection.ssl.enabled", "true")
hadoopConf.set("spark.hadoop.fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("spark.hadoop.fs.s3a.endpoint.region", "default")
hadoopConf.set("com.amazonaws.services.s3.disableGetObjectMD5Validation", "true")
hadoopConf.set("fs.s3a.attempts.maximum", "5")
hadoopConf.set("fs.s3a.connection.establish.timeout", "5000")
hadoopConf.set("fs.s3a.connection.timeout", "10000")
hadoopConf.set("fs.s3a.signing-algorithm", "S3SignerType")
# Querying a simple .txt file
url = str('s3a://bucketname/text.rtf')
l1 = spark.textFile(url).collect()
for row in l1:
print(str(row))
break
当使用不同的端口时,我得到不同的错误,例如:
ceph_port = 6800
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on s3a://bucketname/text.rtf: com.amazonaws.SdkClientException: Unable to execute HTTP request: Unsupported or unrecognized SSL message: Unable to execute HTTP request: Unsupported or unrecognized SSL message
ceph_port = 443
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.nio.file.AccessDeniedException: s3a://bucketname/text.rtf: getFileStatus on s3a://bucketname/text.rtf: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: tx00000080ad2dcdd081322-00642f09fd-5f7a-default; S3 Extended Request ID: null; Proxy: null), S3 Extended Request ID: null:403 Forbidden
ceph_port = 1
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.fs.s3a.AWSClientIOException: getFileStatus on s3a://bucketname/text.rtf: com.amazonaws.SdkClientException: Unable to execute HTTP request: Connect to ceph-host.com:1 [ceph-host.com/host-ip-address] failed: Connection refused (Connection refused)