我无法使用 PySpark 将 RDD 对象保存到文本文件中

问题描述 投票:0回答:1

我正在尝试创建一个 Spark 程序来从 airport.text 文件读取机场数据,查找位于美国的所有机场并将机场名称和城市名称输出到 out_airport_in_usa.text 文件,但在以下情况下我收到错误代码使用

.saveAsTextFile
将 RDD 对象保存为文本文件以供查看。

就像我无法保存和查看RDD数据

我创建了一个包含模块/函数 Utils.COMMA_DELIMITER 的 .py 文件,并将包含此 .py 文件的目录文件夹添加到我的 PYTHONPATH 路径中,以便我可以导入该模块。

我的代码和错误如下:

from pyspark import SparkContext, SparkConf
import Utils     #this is the module in the .py file

#Create a function
def splitComma(line: str):    
    splits = Utils.COMMA_DELIMITER.split(line)    
    return "{}, {}".format(splits[1], splits[2])`

conf = SparkConf().setAppName("airports").setMaster("local[*]")   
sc = SparkContext(conf = conf)`


#Import dataset and auto-creating an RDD
airports = sc.textFile("airports.text")

#filter the data to find all airports in d USA using a lamda function 
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")  

airportsNameAndCityNames = airportsInUSA.map(splitComma)

airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
--------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
c:\Users\user\Documents\Python Programming\PySpark_for_Big_Data_3.ipynb Cell 18 line 2
      1 # Save output in a new text file
----> 2 airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
      3 # displays an error

File C:\spark\spark-3.4.2-bin-hadoop3\python\pyspark\rdd.py:3406, in RDD.saveAsTextFile(self, path, compressionCodecClass)
   3404     keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
   3405 else:
-> 3406     keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)

File C:\spark\spark-3.4.2-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File C:\spark\spark-3.4.2-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
...
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:842)
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...

迫切需要帮助。

我的软件规格是:
Python 3.11.8
Pyspark 3.4.2
火花3.4.2
Java 17
我使用 VS Code IDE

python apache-spark pyspark rdd
1个回答
0
投票

试试这个,对我有用:

from pyspark import SparkContext, SparkConf
import Utils

conf = SparkConf().setAppName("Airports in USA").setMaster("local[*]")
sc = SparkContext(conf=conf)

def splitComma(line: str):
    splits = Utils.COMMA_DELIMITER.split(line)
    return "{}, {}".format(splits[1], splits[2])

airports = sc.textFile("airports.text")

airportsInUSA = airports.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")
print("Number of airports in the USA: ", airportsInUSA.count())  # Debug: Count the entries

airportsNameAndCityNames = airportsInUSA.map(splitComma)

output_dir = "out/airports_in_usa.text"
if os.path.exists(output_dir):
    import shutil
    shutil.rmtree(output_dir)  # Remove the directory if it exists

airportsNameAndCityNames.saveAsTextFile(output_dir)
© www.soinside.com 2019 - 2024. All rights reserved.