我正在尝试使用 bitnami Spark 映像创建 Spark 集群,并将其连接到由 bitnami Minio 映像创建的 Minio 存储。
以下内容由 Bitnami 以位形式提供,是我的 docker-compose 文件:
version: '2'
networks:
spark-network:
driver: bridge
services:
minio:
image: bitnami/minio:latest
ports:
- '9000:9000'
- '9001:9001'
environment:
- MINIO_ROOT_USER=<INSERT>
- MINIO_ROOT_PASSWORD=<INSERT>
networks:
- spark-network
spark:
image: bitnami/spark:3.3.2
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
- '7077:7077'
networks:
- spark-network
depends_on:
- minio
spark-worker:
image: bitnami/spark:3.3.2
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=3G
- SPARK_WORKER_CORES=2
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark-network
depends_on:
- spark
- minio
当点击 docker compose 时,一切看起来都很好。工作人员似乎已连接到主机,并且可以从本地主机访问 Minio(+我可以上传数据)。我尝试将 localhost 更改为主 docker 容器中提供的 IP。
以下是我的 Spark 会话配置:
spark = SparkSession.builder \
.master(f'spark://localhost:7077') \
.appName("docker_spark_minio_storage") \
.config('spark.jars', '/opt/bitnami/spark/jars/hadoop-aws-3.3.2') \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000") \
.config("spark.hadoop.fs.s3a.access.key", <INSERT>) \
.config("spark.hadoop.fs.s3a.secret.key", <INSERT>) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.getOrCreate()
这是我的 PySpark 代码,从我的终端运行,即不将脚本上传到任何容器:
df = (spark.read
.format('csv')
.option('inferSchema', 'true')
.option('header', 'true')
.option('delimiter', delimiter)
.load('s3a://<CONTAINER</<FILENAME.csv>))
问题:
信息:
谢谢!
我已经用谷歌搜索了好几天,询问了 ChatGPT 等。我找不到任何人遇到过类似的问题。我期待 Spark 容器从 Minio 读取我的数据并简单地显示它,以便我可以到达某个地方。
我也遇到了同样的问题,但能够使用下面的配置从 MinIO 读取数据,而不是 localhost,你应该使用容器名称并允许内部端口映射。
conf = (
SparkConf()
.setAppName("file_reader")
.set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.2")
.set("fs.s3a.access.key", os.getenv("MINIO_ACCESS_KEY"))
.set("fs.s3a.secret.key", os.getenv("MINIO_SECRET_KEY"))
.set("spark.hadoop.fs.s3a.endpoint", os.getenv("MINIO_URL"))
.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
.set("fs.s3a.connection.ssl.enabled", "true")
.set("fs.s3a.path.style.access", "true")
.set("fs.s3a.attempts.maximum", "1")
.set("fs.s3a.connection.establish.timeout", "5000")
.set("fs.s3a.connection.timeout", "10000")
)
spark =SparkSession.builder.config(conf=conf).master("spark://spark:7077").getOrCreate()
df = spark.read.option("header", "true").csv(
f"s3a://{bucket_name}/{file_key}")
谢谢你。希望有帮助。
上述问题是在会话初始化时首次加载包。
.config('spark.jars', '/opt/bitnami/spark/jars/*')
如果您调用提交操作的应用程序中有所需的 jar,则上述内容将起作用