我正在尝试创建一个 Spark 程序来从 airport.text 文件读取机场数据,查找位于美国的所有机场并将机场名称和城市名称输出到 out_airport_in_usa.text 文件,但在以下情况下我收到错误代码使用
.saveAsTextFile
将 RDD 对象保存为文本文件以供查看。
就像我无法保存和查看RDD数据
我创建了一个包含模块/函数 Utils.COMMA_DELIMITER 的 .py 文件,并将包含此 .py 文件的目录文件夹添加到我的 PYTHONPATH 路径中,以便我可以导入该模块。
我的代码和错误如下:
from pyspark import SparkContext, SparkConf
import Utils #this is the module in the .py file
#Create a function
def splitComma(line: str):
splits = Utils.COMMA_DELIMITER.split(line)
return "{}, {}".format(splits[1], splits[2])`
conf = SparkConf().setAppName("airports").setMaster("local[*]")
sc = SparkContext(conf = conf)`
#Import dataset and auto-creating an RDD
airports = sc.textFile("airports.text")
#filter the data to find all airports in d USA using a lamda function
airportsInUSA = airports.filter(lambda line : Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")
airportsNameAndCityNames = airportsInUSA.map(splitComma)
airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
--------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
c:\Users\user\Documents\Python Programming\PySpark_for_Big_Data_3.ipynb Cell 18 line 2
1 # Save output in a new text file
----> 2 airportsNameAndCityNames.saveAsTextFile("out/airports_in_usa.text")
3 # displays an error
File C:\spark\spark-3.4.2-bin-hadoop3\python\pyspark\rdd.py:3406, in RDD.saveAsTextFile(self, path, compressionCodecClass)
3404 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
3405 else:
-> 3406 keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
File C:\spark\spark-3.4.2-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File C:\spark\spark-3.4.2-bin-hadoop3\python\lib\py4j-0.10.9.7-src.zip\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
...
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:842)
Output is truncated. View as a scrollable element or open in a text editor. Adjust cell output settings...
迫切需要帮助。
我的软件规格是:
Python 3.11.8
Pyspark 3.4.2
火花3.4.2
Java 17
我使用 VS Code IDE
试试这个,对我有用:
from pyspark import SparkContext, SparkConf
import Utils
conf = SparkConf().setAppName("Airports in USA").setMaster("local[*]")
sc = SparkContext(conf=conf)
def splitComma(line: str):
splits = Utils.COMMA_DELIMITER.split(line)
return "{}, {}".format(splits[1], splits[2])
airports = sc.textFile("airports.text")
airportsInUSA = airports.filter(lambda line: Utils.COMMA_DELIMITER.split(line)[3] == "\"United States\"")
print("Number of airports in the USA: ", airportsInUSA.count()) # Debug: Count the entries
airportsNameAndCityNames = airportsInUSA.map(splitComma)
output_dir = "out/airports_in_usa.text"
if os.path.exists(output_dir):
import shutil
shutil.rmtree(output_dir) # Remove the directory if it exists
airportsNameAndCityNames.saveAsTextFile(output_dir)