我想使用函数来执行我的pyspark程序。我已经完成了main函数下的所有操作,然后尝试调用该main函数。但这不起作用。我有两个数据框。
+----+----+----+
|id |name|marks|
+----+----+----+
| 1| A| 1|
| 1| A| 2|
| 1| A| 3|
| 2| B| 3|
| 2| B| 5|
+----+----+----+
+----+----+----+
|id. |sub |mrks|
+----+----+----+
| 1| A| 10|
| 1| B| 20|
| 1| C| 30|
| 2| B| 35|
| 2| C| 57|
+----+----+----+
我的目标是读取数据框并存储到本地
我的代码是-
from __future__ import division,print_function
from pyspark.sql import functions as F
from pyspark.sql import types as T
import numpy as np
from pyspark.ml.feature import HashingTF, IDF, Normalizer
from collections import defaultdict as D
import datetime
from collections import Counter
import sys
import math
import pyspark
import time
import sys
from datetime import datetime
from math import exp
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').config("spark.sql.session.timeZone", "GMT").getOrCreate()
def main(input_file, output_path, step):
if step == 1:
#read 1st dataframe
df1 = spark.read.parquet(input_file)
df1.write.parquet(output_path \
,mode="overwrite", compression="gzip")
elif step == 2:
df2 = spark.read.parquet(input_file)
df2.write.parquet(output_path \
,mode="overwrite", compression="gzip")
if __name__ == '__main__':
input_file = sys.argv[0]
output_path = sys.argv[1]
step = sys.argv[2]
input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
output_path = "/Users/pallavi/Documents/project/output/"
step = "2"
main(input_file, output_path, step)
我不知道您的错误/问题到底是什么,但是我假设您的Main()方法没有被调用。
这可能是由于参数处理后的缩进行错误。 (提示:Python提供了像argparse这样的库的命令行支持)
现在对您的问题的实际反应:尝试缩进界线
input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
output_path = "/Users/pallavi/Documents/project/output/"
step = "2"
main(input_file, output_path, step)
直接在前面的下面,因此您的结尾段落是这样的:
if __name__ == '__main__':
input_file = sys.argv[0]
output_path = sys.argv[1]
step = sys.argv[2]
input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
output_path = "/Users/pallavi/Documents/project/output/"
step = "2"
main(input_file, output_path, step)
非常简单的示例和下面的参考
def main():
print("Hello World!")
if __name__== "__main__":
main()
我认为您处理代码的方式使我首先引用的行无法访问