[使用main进行pyspark编程

问题描述 投票:0回答:1

我想使用函数来执行我的pyspark程序。我已经完成了main函数下的所有操作,然后尝试调用该main函数。但这不起作用。我有两个数据框。

+----+----+----+
|id  |name|marks|
+----+----+----+
|   1|   A|   1|
|   1|   A|   2|
|   1|   A|   3|
|   2|   B|   3|
|   2|   B|   5|
+----+----+----+


+----+----+----+
|id. |sub |mrks|
+----+----+----+
|   1|   A| 10|
|   1|   B| 20|
|   1|   C| 30|
|   2|   B| 35|
|   2|   C| 57|
+----+----+----+

我的目标是读取数据框并存储到本地

我的代码是-

from __future__ import division,print_function
from pyspark.sql import functions as F
from pyspark.sql import types as T
import numpy as np
from pyspark.ml.feature import HashingTF, IDF, Normalizer
from collections import defaultdict as D
import datetime
from collections import Counter
import sys
import math
import pyspark
import time
import sys
from datetime import datetime
from math import exp

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('spark').config("spark.sql.session.timeZone", "GMT").getOrCreate()

def main(input_file, output_path, step):

    if step == 1:
        #read 1st dataframe
        df1 = spark.read.parquet(input_file)
        df1.write.parquet(output_path \
                                   ,mode="overwrite", compression="gzip")

    elif step == 2:
        df2 = spark.read.parquet(input_file)
        df2.write.parquet(output_path \
                                   ,mode="overwrite", compression="gzip")

if __name__ == '__main__':
    input_file = sys.argv[0]
    output_path = sys.argv[1]
    step = sys.argv[2]

input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
output_path = "/Users/pallavi/Documents/project/output/"
step = "2"

main(input_file, output_path, step)
pyspark pyspark-sql pyspark-dataframes
1个回答
0
投票

我不知道您的错误/问题到底是什么,但是我假设您的Main()方法没有被调用。

这可能是由于参数处理后的缩进行错误。 (提示:Python提供了像argparse这样的库的命令行支持)

现在对您的问题的实际反应:尝试缩进界线

input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
output_path = "/Users/pallavi/Documents/project/output/"
step = "2"

main(input_file, output_path, step)

直接在前面的下面,因此您的结尾段落是这样的:

if __name__ == '__main__':
    input_file = sys.argv[0]
    output_path = sys.argv[1]
    step = sys.argv[2]

    input_file = "/Users/pallavi/Documents/project/test_data1.parquet/"
    output_path = "/Users/pallavi/Documents/project/output/"
    step = "2"

    main(input_file, output_path, step)

非常简单的示例和下面的参考

def main():
  print("Hello World!")

if __name__== "__main__":
  main()

more simple hints here

我认为您处理代码的方式使我首先引用的行无法访问

© www.soinside.com 2019 - 2024. All rights reserved.