Pyspark将数据帧写入avro,保持键值的序列。

问题描述 投票:0回答:1

我试图使用pyspark读取一个avro文件,并根据某些键对其中一列进行排序。在我的avro文件中,有一列包含了一个叫做 MapType 数据,我需要根据键来排序。测试的avro中只有一行,实体列有一个 MapType 数据。我的目的是将输出结果写回avro文件,但键的顺序是一样的。不幸的是,我无法实现这一点,不知道这在avro中是否可能?这是我的代码(我已经创建了一个笔记本来测试它)。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, to_json, create_map, from_json
from pyspark.sql import Row
from pyspark import StorageLevel
import json
from pyspark.sql.types import StringType
import shutil
from pyspark.sql.types import MapType, ArrayType, StringType, StructType, StructField

spark = SparkSession     .builder     .appName("AvroTest")     .config("spark.jars.packages", "org.apache.spark:spark-avro_2.11:2.4.0")     .getOrCreate()

df = spark.read.format("avro").load("part-r-00000.avro")
schema = df.select('entities').schema
sch = schema.fields[0].dataType
print(df.schema)

@udf
def udf_func(line):
    for entkey,subdict in line.items():
        subdictnew = subdict.asDict(True)
        sorteddict = dict(sorted(subdictnew['entities'].items(), key=lambda a: int(a[0])))
        subdictnew['entities'] = sorteddict
        line[entkey] = subdictnew
    return str(line)

dfnew = df.withColumn('entities', from_json(udf_func(df['entities']), sch)).persist(StorageLevel.MEMORY_ONLY_SER)
#dfnew.show()
d = dfnew.dtypes
newschema = dfnew.schema

try:
    shutil.rmtree('testavro/sortedData')
except:
    print('folder already removed')
dfnew.write.format('avro').save('ctipavro/sortedData')
dfnew.show(1, False)

上面的代码写回了avro,但以一种未排序的方式。最后一行是以排序的方式打印 "实体 "的数据框列记录。

|37321431529|37321431529|1561020714|[trade -> [trade, [59489777 -> [TRADE_ASSOC_TO_DB_DT -> 2011-09-30, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-04-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 13, EXCLUSION_CD -> 0, KOB_CD -> BB, PAYMENT_GRID_2 -> 0000000-0-0000-00-00000..............

请注意,这里我打印的是已经排序的数据框输出。但是当我尝试将保存的avro文件读回一个新的数据框中,并做一个 show()钥匙又是未排序的。请注意,第一个键为 trade -> [trade,应该是 59489777而它是另一种东西 - 51237292611. 对了,这个键在我第一次读取输入avro的时候就出现了,不知道为什么在排序和回写之后,它先打印的是同一个键。

dffresh = spark.read.format("avro").load("testavro/sortedData")
schema = dffresh.schema
print(schema)
dffresh.show(1, False)

输出。

|37321431529|37321431529|1561020714|[trade -> [trade, [51237292611 -> [TRADE_ASSOC_TO_DB_DT -> 2014-09-20, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-05-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 0, EXCLUSION_CD -> 0, KOB_CD -> BC, PAYMENT_GRID_2 -> 000000C0000000..................................

我想请大家帮我解决这个问题。我试了很多方法,也找遍了多个SO的问题,都没有找到如何实现的线索。

python pyspark avro spark-avro
1个回答
0
投票

如果你的源数据是avro格式的,一般来说,最好的做法是将处理后的输出写成Parquet文件格式。你可以得到谓词推倒的好处,并总是可以处理选择性的列数。

但是如果你的处理过程中又要写成avro格式,那么列的顺序就不一定能保证,因为使用的数据结构是 Map. 您可以通过使用 select 函数,并按照您选择的顺序读取列。

© www.soinside.com 2019 - 2024. All rights reserved.