我正在 jupyterhub 上运行 Spark 远程内核,但无法在执行器和驱动程序集群节点上读取 csv 文件,我可以添加文件,但无法读取 csv,并出现以下错误。
来自 jupyterhub3.2.2 笔记本的 pyspark 3.4.2 命令
sc = pyspark.SparkContext.getOrCreate()
sc.addFile('file://' + '/opt/spark-3.4.2-bin-hadoop3/work-dir/stock-market-india/FullDataCsv/ZEEL__EQ__NSE__NSE__MINUTE.csv')
fdd = sc.textFile('ZEEL__EQ__NSE__NSE__MINUTE.csv')
fdd.collect()
我可以在两个执行器 Pod 中的以下目录下看到临时文件
#ll /var/data/spark-13acd4aa-2344-4b00-800a-daa4771b7d1f/spark-2c886ade-ac67-4fa2-ba29-e5e4844845fe
-rw-r--r-- 1 jovyan users 21429281 Feb 5 08:40 1831154171707122432750_cache
-rw-r--r-- 1 jovyan users 0 Feb 5 08:40 1831154171707122432750_lock
-rw-r--r-- 1 jovyan users 21889857 Feb 5 05:46 20226180011707111990941_cache
-rw-r--r-- 1 jovyan users 0 Feb 5 05:46 20226180011707111990941_lock
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/opt/spark-3.4.2-bin-hadoop3/work-dir/ZEEL__EQ__NSE__NSE__MINUTE.csv
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:291)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:287)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2328)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1018)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Input path does not exist: file:/opt/spark-3.4.2-bin-hadoop3/work-dir/ZEEL__EQ__NSE__NSE__MINUTE.csv
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
... 30 more
/opt/spark-3.4.2-bin-hadoop3/work-dir/ZEEL__EQ__NSE__NSE__MINUTE.csv 该文件在同一路径上的两个执行器上都可用。
我错过了还是做错了什么?
调用 sc.addFile 方法时不需要前缀“file://”
尝试不使用前缀来使用它
sc.addFile("/opt/spark-3.4.2-bin-hadoop3/work-dir/stock-market-india/FullDataCsv/ZEEL__EQ__NSE__NSE__MINUTE.csv")