无法将 Spark 与 Cassandra 作为 docker 容器连接

问题描述 投票:0回答:1

我正在 compose.yaml 中运行 cassandra 和 Spark docker 容器,如下所示:

spark:
    image: docker.io/bitnami/spark:3.4
    container_name: spark_master
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark
    ports:
      - '8081:8081'
  spark-worker:
    image: docker.io/bitnami/spark:3.4
    container_name: spark_worker
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark
cassandra:
    container_name: cassandra-container
    image: cassandra:latest
    ports:
      - "7000:7000"
      - "7001:7001"
      - "7199:7199"
      - "9042:9042"
      - "9160:9160"
    volumes:
        - cassandra-data:/var/lib/cassandra
    restart: always

我在 cassandra 中摄取了一些数据,我正在尝试使用 pyspark 读取它,我在 Spark 容器内运行一个 hello world 程序并且它运行成功,我将 pyspark 文件复制到容器内的

/opt/bitnami/spark/spark-script/
目录中能够运行它们, 我还将
spark-cassandra-connector_2.12-latest_version.jar
放入容器内的
/opt/bitnami/spark/jars/
中。

这是我尝试连接并从 cassandra 读取数据到 pyspark 连接文件的 python 文件:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

# Define your Cassandra connection settings
cassandra_host = "127.0.0.1"  # Replace with the actual IP address of your Cassandra container
cassandra_port = "9042"  # Default Cassandra port
cassandra_keyspace = "mykey_space"  # Replace with your Cassandra keyspace
cassandra_table = "test_table"  # Replace with your Cassandra table

# Create a Spark session
spark = SparkSession.builder \
    .appName("CassandraConnectionTest") \
    .config("spark.cassandra.connection.host", cassandra_host) \
    .config("spark.cassandra.connection.port", cassandra_port) \
    .getOrCreate()

# Test the Cassandra connection
try:
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table=cassandra_table,keyspace= cassandra_keyspace) \
        .load()
    df.show(5)
    print("Cassandra connection test successful.")
except Exception as e:
    print(f"Error: {str(e)}")
    print("Cassandra connection test failed.")

# Stop the Spark session
spark.stop()

我运行以下命令来运行此脚本:

docker exec -it spark_worker /bin/bash -c "spark-submit --jars /opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar /opt/bitnami/spark/spark-script/cassandra_spark.py"

我得到的错误如下:

23/09/17 17:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/17 17:45:11 INFO SparkContext: Running Spark version 3.4.1
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO ResourceUtils: No custom resources configured for spark.driver.
23/09/17 17:45:12 INFO ResourceUtils: ==============================================================
23/09/17 17:45:12 INFO SparkContext: Submitted application: CassandraConnectionTest
23/09/17 17:45:12 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
23/09/17 17:45:12 INFO ResourceProfile: Limiting resource is cpu
23/09/17 17:45:12 INFO ResourceProfileManager: Added ResourceProfile id: 0
23/09/17 17:45:12 INFO SecurityManager: Changing view acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls to: spark
23/09/17 17:45:12 INFO SecurityManager: Changing view acls groups to: 
23/09/17 17:45:12 INFO SecurityManager: Changing modify acls groups to: 
23/09/17 17:45:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: spark; groups with view permissions: EMPTY; users with modify permissions: spark; groups with modify permissions: EMPTY
23/09/17 17:45:12 INFO Utils: Successfully started service 'sparkDriver' on port 37035.
23/09/17 17:45:12 INFO SparkEnv: Registering MapOutputTracker
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMaster
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
23/09/17 17:45:12 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
23/09/17 17:45:12 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
23/09/17 17:45:12 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-34cb5ed0-030e-41f5-846f-9f43fbb76d7b
23/09/17 17:45:12 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB
23/09/17 17:45:12 INFO SparkEnv: Registering OutputCommitCoordinator
23/09/17 17:45:13 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
23/09/17 17:45:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
23/09/17 17:45:13 INFO SparkContext: Added JAR file:///opt/bitnami/spark/jars/spark-cassandra-connector_2.12-latest_version.jar at spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO Executor: Starting executor ID driver on host b168f719ac50
23/09/17 17:45:13 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
23/09/17 17:45:13 INFO Executor: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar with timestamp 1694972711924
23/09/17 17:45:13 INFO TransportClientFactory: Successfully created connection to b168f719ac50/172.28.0.2:37035 after 46 ms (0 ms spent in bootstraps)
23/09/17 17:45:13 INFO Utils: Fetching spark://b168f719ac50:37035/jars/spark-cassandra-connector_2.12-latest_version.jar to /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/fetchFileTemp7263685075750385356.tmp
23/09/17 17:45:13 INFO Executor: Adding file:/tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/userFiles-2e5335c1-140a-4c90-a340-fdc663636446/spark-cassandra-connector_2.12-latest_version.jar to class loader
23/09/17 17:45:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45859.
23/09/17 17:45:13 INFO NettyBlockTransferService: Server created on b168f719ac50:45859
23/09/17 17:45:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
23/09/17 17:45:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMasterEndpoint: Registering block manager b168f719ac50:45859 with 434.4 MiB RAM, BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, b168f719ac50, 45859, None)
23/09/17 17:45:14 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
23/09/17 17:45:14 INFO SharedState: Warehouse path is 'file:/opt/bitnami/spark/spark-warehouse'.
Error: An error occurred while calling o34.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: org.apache.spark.sql.cassandra. Please find packages at `https://spark.apache.org/third-party-projects.html`.
    at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.cassandra.DefaultSource
    at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
    at scala.util.Failure.orElse(Try.scala:224)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
    ... 15 more

Cassandra connection test failed.
23/09/17 17:45:16 INFO SparkContext: SparkContext is stopping with exitCode 0.
23/09/17 17:45:16 INFO SparkUI: Stopped Spark web UI at http://b168f719ac50:4040
23/09/17 17:45:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/09/17 17:45:16 INFO MemoryStore: MemoryStore cleared
23/09/17 17:45:16 INFO BlockManager: BlockManager stopped
23/09/17 17:45:16 INFO BlockManagerMaster: BlockManagerMaster stopped
23/09/17 17:45:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/09/17 17:45:16 INFO SparkContext: Successfully stopped SparkContext
23/09/17 17:45:16 INFO ShutdownHookManager: Shutdown hook called
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634/pyspark-474abab0-e74c-44c9-b6b8-fd378e8ea964
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-d094d4f5-8b21-4b18-84b5-df94e2381346
23/09/17 17:45:16 INFO ShutdownHookManager: Deleting directory /tmp/spark-dbc3271d-513b-43b0-9990-e813f05d0634

我总是确保容器在同一网络上运行,我在这里做错了什么?任何帮助将不胜感激。

docker apache-spark pyspark spark-cassandra-connector
1个回答
0
投票

问题出在这一行:

cassandra_host = "127.0.0.1"`

因为

127.0.0.1
是本地“机器”的地址,在本例中它将是驱动程序或工作 Docker 实例。最好是使用 Cassandra docker 实例的名称,根据您的情况:

cassandra_host = "cassandra"`

有关网络的更多详细信息,请参阅 docker-compose 文档

© www.soinside.com 2019 - 2024. All rights reserved.