PySpark TypeErrors

问题描述 投票:0回答:1

编写简单的CSV到Parquet转换。

CSV文件中有几个时间戳。因此,当我尝试书写时遇到类型错误。

要解决此问题,我尝试实现此行以标识时间戳列cols并对其执行to_timestamp。

rdd = sc.textFile("../../../Downloads/test_type.csv").map(lambda line: [to_timestamp(i) if instr(i,"-")==5 else i for i in line.split(",")])

出现此错误:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:/yy/xx/Documents/gg/csv_to_parquet/csv_to_parquet.py", line 55, in <lambda>
    rdd = sc.textFile("../../../test/test.csv").map(lambda line: [to_timestamp(i) if (instr(i,"-")==5) else i for i in line.split(",")])

AttributeError: 'NoneType' object has no attribute '_jvm'

任何想法如何实现这一目标?

==========================================>

版本2

今天取得了一些进展,我现在正在编写镶木地板文件,但是当我查询数据时,我得到了二进制数据与时间戳数据错误:

HIVE_BAD_DATA: Field header__timestamp's type BINARY in parquet is incompatible with type timestamp defined in table schema

我修改了代码以最初使用所有StringType,然后修改了数据框中的数据类型。

    sc = SparkContext(appName="CSV2Parquet")
    sqlContext = SQLContext(sc)

    schema = StructType\
    ([
        StructField("header__change_seq", StringType(), True),
        StructField("header__change_oper", StringType(), True),
        StructField("header__change_mask", StringType(), True),
        StructField("header__stream_position", StringType(), True),
        StructField("header__operation", StringType(), True),
        StructField("header__transaction_id", StringType(), True),
        StructField("header__timestamp", StringType(), True),
        StructField("l_en_us", StringType(), True),
        StructField("priority", StringType(), True),
        StructField("typecode", StringType(), True),
        StructField("retired", StringType(), True),
        StructField("name", StringType(), True),
        StructField("id", StringType(), True),
        StructField("description", StringType(), True),
        StructField("l_es_ar", StringType(), True),
        StructField("adw_updated_ts", StringType(), True),
        StructField("adw_process_id", StringType(), True)
        ])

   rdd = sc.textFile("../../../Downloads/pctl_jobdatetype.csv").map(lambda line: line.split(","))

    df = sqlContext.createDataFrame(rdd, schema)
    df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp'))
    df2 = df.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp'))
    df2 = df.withColumn('priority', df['priority'].cast('double'))
    df2 = df.withColumn('id', df['id'].cast('double'))

    df2.write.parquet('../../../Downloads/input-parquet')

样本数据:

"header__change_seq","header__change_oper","header__change_mask","header__stream_position","header__operation","header__transaction_id","header__timestamp","l_en_us","priority","typecode","retired","name","id","description","l_es_ar","adw_updated_ts","adw_process_id"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Effective Date","10.0","Effective","0","Effective Date","10001.0","Effective Date","Effective Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Written Date","20.0","Written","0","Written Date","10002.0","Written Date","Written Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Reference Date","30.0","Reference","0","Reference Date","10003.0","Reference Date","Reference Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"

编写简单的CSV到Parquet转换。 CSV文件中有几个时间戳。因此,在尝试编写时出现类型错误。要解决此问题,我尝试将这一行实现为...

pyspark pyspark-sql
1个回答
0
投票

我将下面第3-6行的数据框名称修改为df2后,似乎工作正常,Athena也返回了结果。

© www.soinside.com 2019 - 2024. All rights reserved.