使用 spark-submit 时连接被拒绝

问题描述 投票:0回答:0

我是 Apache Spark 的新用户。

目前,我正在尝试使用结构化流从 Kafka 读取具有 Avro 格式和模式注册表的数据。

我使用以下文件在 Docker 上安装了 Spark

version: "3.3"
services:
  spark-master:
    image: docker.io/bitnami/spark:3.2.4
    ports:
      - "39090:8080"
      - "37077:7077"
    volumes:
       - ./apps:/opt/spark-apps
       - ./data:/opt/spark-data
    environment:
      - SPARK_LOCAL_IP=spark-master
      - SPARK_MODE=master
  spark-worker-a:
    image: docker.io/bitnami/spark:3.2.4
    ports:
      - "39091:8081"
      - "37000:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_MODE=worker
      - SPARK_LOCAL_IP=spark-worker-a
      - SPARK_USER=spark
      - SPARK_WORKER_PORT=7000

    volumes:
       - ./apps:/opt/spark-apps
       - ./data:/opt/spark-data
  spark-worker-b:
    image: docker.io/bitnami/spark:3.2.4
    ports:
      - "39092:8081"
      - "37001:7000"
    depends_on:
      - spark-master
    environment:
      - SPARK_MASTER_URL=spark://spark-master:7077
      - SPARK_WORKER_CORES=1
      - SPARK_WORKER_MEMORY=1G
      - SPARK_DRIVER_MEMORY=1G
      - SPARK_EXECUTOR_MEMORY=1G
      - SPARK_MODE=worker
      - SPARK_LOCAL_IP=spark-worker-b
      - SPARK_USER=spark
      - SPARK_WORKER_PORT=7000
    volumes:
        - ./apps:/opt/spark-apps
        - ./data:/opt/spark-data

并使用以下 Scala 脚本来实现我的目标。

package poc

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config.{AbrisConfig, FromAvroConfig}

object ConfluentAvroToStructureStreaming {

  def main(args:Array[String]) {
    val abrisConfig: FromAvroConfig = AbrisConfig
      .fromConfluentAvro
      .downloadReaderSchemaByLatestVersion
      .andTopicNameStrategy("MyTopic")
      .usingSchemaRegistry("http://#.#.#.#:8081")

    val spark = SparkSession
      .builder
      .appName("Simple Application")
      .getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "#.#.#.#:9092")
      .option("startingOffsets", "earliest")
      .option("subscribe", "MyTopic")
      .load()

    val deserializedAvro = df.select(from_avro(col("value"), abrisConfig).as("data"))
      .select(col("data.*"))

    deserializedAvro.writeStream
      .outputMode("append")
      .format("console")
      .start()
      .awaitTermination()
  }
}

然后,我使用以下命令在 Spark 上部署我的脚本,

spark-submit --master spark://spark-master:7077 
--packages za.co.absa:abris_2.12:6.2.0,org.apache.kafka:kafka-clients:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 
--repositories https://packages.confluent.io/maven 
--class poc.ConfluentAvroToStructureStreaming /spark-confluent-avro-poc-8_2.12-0.1.jar

但我收到以下错误:

23/04/29 18:12:48 ERROR Utils: Aborting task
java.io.IOException: Failed to connect to 4a3a275e63df/#.#.#.#:38975
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
        at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
        at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
        at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:791)
        at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
        at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
        at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
        at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
        at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
        at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2700)
        at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
        at poc.ConfluentAvroToStructureStreaming$.main(ConfluentAvroToStructureStreaming.scala:24)
        at poc.ConfluentAvroToStructureStreaming.main(ConfluentAvroToStructureStreaming.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:966)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1054)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1063)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: 4a3a275e63df/#.#.#.#:38975
Caused by: java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
23/04/29 18:12:48 ERROR SparkContext: Error initializing SparkContext.
java.io.IOException: Failed to connect to 4a3a275e63df/#.#.#.#:38975
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
        at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
        at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
        at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:791)
        at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
        at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
        at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
        at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
        at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
        at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2700)
        at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
        at poc.ConfluentAvroToStructureStreaming$.main(ConfluentAvroToStructureStreaming.scala:24)
        at poc.ConfluentAvroToStructureStreaming.main(ConfluentAvroToStructureStreaming.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:966)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1054)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1063)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: 4a3a275e63df/#.#.#.#:38975
Caused by: java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
23/04/29 18:12:48 INFO SparkUI: Stopped Spark web UI at http://4a3a275e63df:4040
23/04/29 18:12:48 ERROR Utils: Uncaught exception in thread main
java.lang.NullPointerException
        at org.apache.spark.scheduler.local.LocalSchedulerBackend.org$apache$spark$scheduler$local$LocalSchedulerBackend$$stop(LocalSchedulerBackend.scala:173)
        at org.apache.spark.scheduler.local.LocalSchedulerBackend.stop(LocalSchedulerBackend.scala:144)
        at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:927)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2563)
        at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2096)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1471)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:2096)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:687)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2700)
        at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
        at poc.ConfluentAvroToStructureStreaming$.main(ConfluentAvroToStructureStreaming.scala:24)
        at poc.ConfluentAvroToStructureStreaming.main(ConfluentAvroToStructureStreaming.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:966)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1054)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1063)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
23/04/29 18:12:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/04/29 18:12:48 INFO MemoryStore: MemoryStore cleared
23/04/29 18:12:48 INFO BlockManager: BlockManager stopped
23/04/29 18:12:48 INFO BlockManagerMaster: BlockManagerMaster stopped
23/04/29 18:12:48 WARN MetricsSystem: Stopping a MetricsSystem that is not running
23/04/29 18:12:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/04/29 18:12:48 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.io.IOException: Failed to connect to 4a3a275e63df/#.#.#.#:38975
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:288)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
        at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
        at org.apache.spark.rpc.netty.NettyRpcEnv.downloadClient(NettyRpcEnv.scala:399)
        at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$openChannel$4(NettyRpcEnv.scala:367)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.rpc.netty.NettyRpcEnv.openChannel(NettyRpcEnv.scala:366)
        at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:791)
        at org.apache.spark.util.Utils$.fetchFile(Utils.scala:549)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13(Executor.scala:962)
        at org.apache.spark.executor.Executor.$anonfun$updateDependencies$13$adapted(Executor.scala:954)
        at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
        at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
        at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
        at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:954)
        at org.apache.spark.executor.Executor.<init>(Executor.scala:247)
        at org.apache.spark.scheduler.local.LocalEndpoint.<init>(LocalSchedulerBackend.scala:64)
        at org.apache.spark.scheduler.local.LocalSchedulerBackend.start(LocalSchedulerBackend.scala:132)
        at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:591)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2700)
        at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
        at poc.ConfluentAvroToStructureStreaming$.main(ConfluentAvroToStructureStreaming.scala:24)
        at poc.ConfluentAvroToStructureStreaming.main(ConfluentAvroToStructureStreaming.scala)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:966)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1054)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1063)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: 4a3a275e63df/#.#.#.#:38975
Caused by: java.net.ConnectException: Connection refused
        at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:777)
        at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)
23/04/29 18:12:48 ERROR Utils: Uncaught exception in thread shutdown-hook-0
java.lang.ExceptionInInitializerError
        at org.apache.spark.executor.Executor.stop(Executor.scala:333)
        at org.apache.spark.executor.Executor.$anonfun$stopHookReference$1(Executor.scala:76)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at org.apache.spark.shuffle.ShuffleBlockPusher$.<init>(ShuffleBlockPusher.scala:465)
        at org.apache.spark.shuffle.ShuffleBlockPusher$.<clinit>(ShuffleBlockPusher.scala)
        ... 16 more
23/04/29 18:12:48 WARN ShutdownHookManager: ShutdownHook '' failed, java.util.concurrent.ExecutionException: java.lang.ExceptionInInitializerError
java.util.concurrent.ExecutionException: java.lang.ExceptionInInitializerError
        at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:205)
        at org.apache.hadoop.util.ShutdownHookManager.executeShutdown(ShutdownHookManager.java:124)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:95)
Caused by: java.lang.ExceptionInInitializerError
        at org.apache.spark.executor.Executor.stop(Executor.scala:333)
        at org.apache.spark.executor.Executor.$anonfun$stopHookReference$1(Executor.scala:76)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2048)
        at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at scala.util.Try$.apply(Try.scala:213)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
        at org.apache.spark.shuffle.ShuffleBlockPusher$.<init>(ShuffleBlockPusher.scala:465)
        at org.apache.spark.shuffle.ShuffleBlockPusher$.<clinit>(ShuffleBlockPusher.scala)
        ... 16 more

我尝试搜索错误的解决方案并找到了

java.io.IOException 本地火花模式

我觉得如果不在Scala脚本中转换数据,Spark是可以正常工作的。事实证明这是真的,因为 Spark 能够工作并显示 Spark UI。

enter image description here

但是我读到的数据没有用,所以我需要对数据进行改造

我尝试按照帖子中的建议使用“export SPARK_LOCAL_IP="127.0.0.1"",它成功了! (它能够按预期转换数据。)但是,我注意到应用程序 ID 没有显示在 Spark UI 上,并且在使用“top”命令时 Spark worker 上没有任务。

enter image description here

apache-spark
© www.soinside.com 2019 - 2024. All rights reserved.