编写简单的CSV到Parquet转换。
CSV文件中有几个时间戳。因此,当我尝试书写时遇到类型错误。
要解决此问题,我尝试实现此行以标识时间戳列cols并对其执行to_timestamp。
rdd = sc.textFile("../../../Downloads/test_type.csv").map(lambda line: [to_timestamp(i) if instr(i,"-")==5 else i for i in line.split(",")])
出现此错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:/yy/xx/Documents/gg/csv_to_parquet/csv_to_parquet.py", line 55, in <lambda>
rdd = sc.textFile("../../../test/test.csv").map(lambda line: [to_timestamp(i) if (instr(i,"-")==5) else i for i in line.split(",")])
AttributeError: 'NoneType' object has no attribute '_jvm'
任何想法如何实现这一目标?
==========================================>
今天取得了一些进展,我现在正在编写镶木地板文件,但是当我查询数据时,我得到了二进制数据与时间戳数据错误:
HIVE_BAD_DATA: Field header__timestamp's type BINARY in parquet is incompatible with type timestamp defined in table schema
我修改了代码以最初使用所有StringType,然后修改了数据框中的数据类型。
sc = SparkContext(appName="CSV2Parquet") sqlContext = SQLContext(sc) schema = StructType\ ([ StructField("header__change_seq", StringType(), True), StructField("header__change_oper", StringType(), True), StructField("header__change_mask", StringType(), True), StructField("header__stream_position", StringType(), True), StructField("header__operation", StringType(), True), StructField("header__transaction_id", StringType(), True), StructField("header__timestamp", StringType(), True), StructField("l_en_us", StringType(), True), StructField("priority", StringType(), True), StructField("typecode", StringType(), True), StructField("retired", StringType(), True), StructField("name", StringType(), True), StructField("id", StringType(), True), StructField("description", StringType(), True), StructField("l_es_ar", StringType(), True), StructField("adw_updated_ts", StringType(), True), StructField("adw_process_id", StringType(), True) ]) rdd = sc.textFile("../../../Downloads/pctl_jobdatetype.csv").map(lambda line: line.split(",")) df = sqlContext.createDataFrame(rdd, schema) df2 = df.withColumn('header__timestamp', df['header__timestamp'].cast('timestamp')) df2 = df.withColumn('adw_updated_ts', df['adw_updated_ts'].cast('timestamp')) df2 = df.withColumn('priority', df['priority'].cast('double')) df2 = df.withColumn('id', df['id'].cast('double')) df2.write.parquet('../../../Downloads/input-parquet')
样本数据:
"header__change_seq","header__change_oper","header__change_mask","header__stream_position","header__operation","header__transaction_id","header__timestamp","l_en_us","priority","typecode","retired","name","id","description","l_es_ar","adw_updated_ts","adw_process_id"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Effective Date","10.0","Effective","0","Effective Date","10001.0","Effective Date","Effective Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Written Date","20.0","Written","0","Written Date","10002.0","Written Date","Written Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
,"I",,,"IDL",,"1970-01-01 00:00:01.000","Reference Date","30.0","Reference","0","Reference Date","10003.0","Reference Date","Reference Date","2020-02-16 15:45:07.432","fb69d6f6-06fa-4c93-b8d6-bb7c7229ee88"
编写简单的CSV到Parquet转换。 CSV文件中有几个时间戳。因此,在尝试编写时出现类型错误。要解决此问题,我尝试将这一行实现为...
我将下面第3-6行的数据框名称修改为df2后,似乎工作正常,Athena也返回了结果。