我有一个 parquet 文件,由 Spark 作为外部表读取。
其中一列在 parquet 模式和 Spark 表中都定义为 int。
最近,我发现 int 太小,无法满足我的需求,因此我在新的 parquet 文件中将列类型更改为 long。 我还将 Spark 表中的类型更改为 bigint。
但是,当我尝试通过 Spark 读取旧的 parquet 文件(带有 int)作为外部表(带有 bigint)时,出现以下错误:
java.lang.UnsupportedOperationException:org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
一种可能的解决方案是将旧镶木地板中的列类型更改为长整型,我在这里询问过:如何将镶木地板列类型从 int 更改为 long?,但由于我有大量数据,因此它非常昂贵。
另一种可能的解决方案是根据其架构将每个 parquet 文件读取到不同的 Spark 表,并创建新旧表的联合视图,这非常难看。
还有另一种方法可以从 parquet 中读取与 Spark 中一样长的 int 列吗?
使用 pyspark 你不能这样做吗
df = spark.read.parquet('path to parquet files')
只需更改数据框中列类型的转换
new_df = (df
.withColumn('col_name', col('col_name').cast(LongType()))
)
然后使用覆盖模式将新数据帧保存到同一位置
这主要发生在 .parquet 文件中的列为 double 或 float
时单行答案,设置
spark.conf.set("spark.sql.parquet.enableVectorizedReader","false")
TL;博士
当我们使用spark读取数据时,特别是
parquet
数据
data = spark.read.parquet(source_path)
Spark 尝试优化并从
.parquet
文件中读取矢量化格式的数据。即使我们进行显式数据类型转换,
new_data = data.withColumn(col_name, col(col_name).cast(TimestampType()))
spark 将在 parquet 中使用本机数据类型(无论 .parquet 文件中存在什么原始数据类型)。
由于数据和列类型不匹配,这会导致写入数据时出现问题
要解决此问题,请禁用矢量化阅读器。
要了解矢量化阅读器,请参阅下文
矢量化 Parquet 读取器可实现本机记录级过滤 使用下推过滤器,改进内存局部性和缓存 利用率。如果禁用矢量化 Parquet 读取器,可能会出现 对性能影响较小。如果你有的话,你应该只禁用它 源数据中的十进制类型列。
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html