Spark 2.3.1 AWS EMR不返回某些列的数据,但仍适用于Athena / Presto和Spectrum

问题描述 投票:3回答:2

我在AWS EMR上使用Spark 2.3.1上的PySpark(Python 2.7.14)

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.debug.maxToStringFields", 100) \
    .enableHiveSupport() \
    .getOrCreate()


spark.sql('select `message.country` from datalake.leads_notification where `message.country` is not null').show(10)

这不返回数据,找到0行。上表中每行的每个值都返回Null。数据存储在PARQUET中。

当我在AWS Athena / Presto或AWs Redshift Spectrum上运行相同的SQL查询时,我会正确返回所有列数据(大多数列值不为空)。

这是返回正确数据的Athena SQL和Redshift SQL查询:

select "message.country" from datalake.leads_notification where "message.country" is not null limit 10;

我在所有情况下都使用AWS Glue目录。上面的列未分区,但表在其他列上分区。我试着使用修复表,它没有帮助。即MSCK REPAIR TABLE datalake.leads_notification

我试过Schema Merge = True就像这样:

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL data source example") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("spark.sql.parquet.mergeSchema", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .config("spark.debug.maxToStringFields", 200) \
    .enableHiveSupport() \
    .getOrCreate()

没有区别,即使有些列不为空,仍然每列的每个值都为空。

此列已添加为表的最后一列,因此大多数数据确实为null,但某些行不为null。该列最后列在目录中的列列表中,位于分区列的正上方。

然而,Athena / Presto检索所有非空值OK,Redshift Spectrum也是如此,但是唉EMR Spark 2.3.1 PySpark将此列的所有值都显示为“null”。 Spark中的所有其他列都可以正确检索。

任何人都可以帮我调试这个问题吗?

由于输出格式,Hive Schema很难在此处剪切和粘贴。

***CREATE TABLE datalake.leads_notification(
  message.environment.siteorigin string, 
  dcpheader.dcploaddateutc string, 
  message.id int, 
  message.country string, 
  message.financepackage.id string, 
  message.financepackage.version string)
PARTITIONED BY ( 
  partition_year_utc string, 
  partition_month_utc string, 
  partition_day_utc string, 
  job_run_guid string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://blahblah/leads_notification/leads_notification/'
TBLPROPERTIES (
  'CrawlerSchemaDeserializerVersion'='1.0', 
  'CrawlerSchemaSerializerVersion'='1.0', 
  'UPDATED_BY_CRAWLER'='weekly_datalake_crawler', 
  'averageRecordSize'='3136', 
  'classification'='parquet', 
  'compressionType'='none', 
  'objectCount'='2', 
  'recordCount'='897025', 
  'sizeKey'='1573529662', 
  'spark.sql.create.version'='2.2 or prior', 
  'spark.sql.sources.schema.numPartCols'='4', 
  'spark.sql.sources.schema.numParts'='3', 
  'spark.sql.sources.schema.partCol.0'='partition_year_utc', 
  'spark.sql.sources.schema.partCol.1'='partition_month_utc', 
  'spark.sql.sources.schema.partCol.2'='partition_day_utc', 
  'spark.sql.sources.schema.partCol.3'='job_run_guid', 
  'typeOfData'='file')***

最后3列在Spark中都有相同的问题:

message.country string, 
message.financepackage.id string, 
message.financepackage.version string

所有在Athena / Presto和Redshift Spectrum中使用相同的目录返回OK。

我为我的编辑道歉。

谢谢

apache-spark amazon-emr
2个回答
1
投票

做第5步架构检查:http://www.openkb.info/2015/02/how-to-build-and-use-parquet-tools-to.html

我的赌注是镶木地板定义中的这些新列名称是大写(而其他列名称是小写)或者镶木地板定义中的新列名称是小写(而其他列名称是大写)

Spark issues reading parquet files https://medium.com/@an_chee/why-using-mixed-case-field-names-in-hive-spark-sql-is-a-bad-idea-95da8b6ec1e0


0
投票
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL data source example") \
        .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
        .config("hive.exec.dynamic.partition", "true") \
        .config("spark.sql.parquet.mergeSchema", "true") \
        .config("spark.sql.hive.convertMetastoreParquet", "false") \
        .config("hive.exec.dynamic.partition.mode", "nonstrict") \
        .config("spark.debug.maxToStringFields", 200) \
        .enableHiveSupport() \
        .getOrCreate()

这是解决方案:请注意

 .config("spark.sql.hive.convertMetastoreParquet", "false") 

架构列都是小写的,架构是由AWS Glue创建的,而不是我的自定义代码所以我真的不知道导致问题的原因所以使用上面的内容可能是架构创建不在你的控制之下的安全默认设置。这是一个主要陷阱,恕我直言,所以我希望这将有助于其他人。感谢tooptoop4指出了这篇文章:

https://medium.com/@an_chee/why-using-mixed-case-field-names-in-hive-spark-sql-is-a-bad-idea-95da8b6ec1e0

© www.soinside.com 2019 - 2024. All rights reserved.