我正在 compose.yaml 中运行 cassandra 和 Spark docker 容器,如下所示:
spark:
image: docker.io/bitnami/spark:3.4
container_name: spark_master
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
ports:
- '8081:8081'
spark-worker:
image: docker.io/bitnami/spark:3.4
container_name: spark_worker
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
cassandra:
container_name: cassandra-container
image: cassandra:latest
ports:
- "7000:7000"
- "7001:7001"
- "7199:7199"
- "9042:9042"
- "9160:9160"
volumes:
- cassandra-data:/var/lib/cassandra
restart: always
我在 cassandra 中摄取了一些数据,我正在尝试使用 pyspark 读取它,我在 Spark 容器内运行一个 hello world 程序并且它运行成功,我将 pyspark 文件复制到容器内的
/opt/bitnami/spark/spark-script/
目录中能够运行它们,
我还将 spark-cassandra-connector_2.12-latest_version.jar
放入容器内的 /opt/bitnami/spark/jars/
中。
这是我尝试连接并从 cassandra 读取数据到 pyspark 连接文件的 python 文件:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
# Define your Cassandra connection settings
cassandra_host = "127.0.0.1" # Replace with the actual IP address of your Cassandra container
cassandra_port = "9042" # Default Cassandra port
cassandra_keyspace = "mykey_space" # Replace with your Cassandra keyspace
cassandra_table = "test_table" # Replace with your Cassandra table
# Create a Spark session
spark = SparkSession.builder \
.appName("CassandraConnectionTest") \
.config("spark.cassandra.connection.host", cassandra_host) \
.config("spark.cassandra.connection.port", cassandra_port) \
.getOrCreate()
# Test the Cassandra connection
try:
df = spark.read \
.format("org.apache.spark.sql.cassandra") \
.options(table=cassandra_table,keyspace= cassandra_keyspace) \
.load()
df.show(5)
print("Cassandra connection test successful.")
except Exception as e:
print(f"Error: {str(e)}")
print("Cassandra connection test failed.")
# Stop the Spark session
spark.stop()
我运行以下命令来运行此脚本:
docker exec -it spark_worker /bin/bash -c "spark-submit --jars /opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar /opt/bitnami/spark/spark-script/cassandra_spark.py"
我得到的错误如下:
23/09/17 17:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/17 17:45:11 INFO SparkContext: Running Spark version 3.4.1
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO ResourceUtils: No custom resources configured for spark.driver.
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO SparkContext: Submitted application: CassandraConnectionTest
23/09/17 17:45:12 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/09/17 17:45:12 INFO ResourceProfile: Limiting resource is cpu
23/09/17 17:45:12 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/09/17 17:45:12 INFO SecurityManager: Changing view acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing view acls groups to:
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls groups to:
23/09/17 17:45:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
23/09/17 17:45:12 INFO Utils: Successfully started service 'sparkDriver' on port 37035.
23/09/17 17:45:12 INFO SparkEnv: Registering MapOutputTracker
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMaster
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/09/17 17:45:12 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-34cb5ed0-030e-41f5-846f-9f43fbb76d7b
23/09/17 17:45:12 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/09/17 17:45:12 INFO SparkEnv: Registering OutputCommitCoordinator
23/09/17 17:45:13 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
23/09/17 17:45:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/09/17 17:45:13 INFO SparkContext: Added JAR file:///opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar at spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO Executor: Starting executor ID driver on host b168f719ac50
23/09/17 17:45:13 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/09/17 17:45:13 INFO Executor: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO TransportClientFactory: Successfully created connection to b168f719ac50/172.28.0.2:37035 after 46 ms (0 ms spent in bootstraps)
23/09/17 17:45:13 INFO Utils: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar to /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/fetchFileTemp7263685075750385356.tmp
23/09/17 17:45:13 INFO Executor: Adding file:/tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/spark-cassandra-connector_2.12-latest_version.jar to class loader
23/09/17 17:45:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45859.
23/09/17 17:45:13 INFO NettyBlockTransferService: Server created on b168f719ac50:45859
23/09/17 17:45:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/09/17 17:45:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMasterEndpoint: Registering block manager b168f719ac50:45859 with 434.4 MiB RAM, BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:14 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/09/17 17:45:14 INFO SharedState: Warehouse path is 'file:/opt/bitnami/spark/spark-warehouse'.
Error: An error occurred while calling o34.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.spark.sql.cassandra. Please find packages at `https://spark.apache.org/third-party-projects.html`.
at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
... 15 more
Cassandra connection test failed.
23/09/17 17:45:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/09/17 17:45:16 INFO SparkUI: Stopped Spark web UI at http://b168f719ac50:4040
23/09/17 17:45:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/09/17 17:45:16 INFO MemoryStore: MemoryStore cleared
23/09/17 17:45:16 INFO BlockManager: BlockManager stopped
23/09/17 17:45:16 INFO BlockManagerMaster: BlockManagerMaster stopped
23/09/17 17:45:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/09/17 17:45:16 INFO SparkContext: Successfully stopped SparkContext
23/09/17 17:45:16 INFO ShutdownHookManager: Shutdown hook called
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/pyspark-474abab0-e74c-44c9-b6b8-fd378e8ea964
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-d094d4f5-8b21-4b18-84b5-df94e2381346
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634
我总是确保容器在同一网络上运行,我在这里做错了什么?任何帮助将不胜感激。
问题出在这一行:
cassandra_host = "127.0.0.1"`
因为
127.0.0.1
是本地“机器”的地址,在本例中它将是驱动程序或工作 Docker 实例。最好是使用 Cassandra docker 实例的名称,根据您的情况:
cassandra_host = "cassandra"`
有关网络的更多详细信息,请参阅 docker-compose 文档。