使用自定义模式读取数据

问题描述 投票:1回答:2

我有不带标题的管道分隔文件,想读入Spark。

数据看起来像:

100005072756|R|OTHER|8|125000|360|11/2000|01/2001|95|95|1|29|694|N|P|SF|1|P|MI|492|30|FRM||1|N
100006905529|C|FLAGSTAR BANK, FSB|7.875|99000|360|12/2000|02/2001|80|80|1|55|689|N|C|SF|1|P|CA|923||FRM|||N
100010275736|R|JPMORGAN CHASE BANK, NA|7.625|140000|360|12/2000|02/2001|21|21|1|12|796|N|R|SF|1|P|CO|803||FRM|||N
100015654434|B|FLAGSTAR BANK, FSB|7.625|102000|360|03/2001|05/2001|80|80|2|32|706|N|C|SF|1|P|WI|541||FRM|661||N
100020827204|R|OTHER|7.125|214000|360|01/2001|03/2001|80|80|1|43|731|N|P|SF|4|P|IL|604||FRM|||N
100027347659|R|OTHER|8|195000|360|12/2000|02/2001|74|74|2|42|720|N|C|SF|1|P|VA|236||FRM|696||N
100037532932|R|BANK OF AMERICA, N.A.|7.75|228000|360|12/2000|02/2001|95|95|2|38|642|N|P|SF|1|P|AL|352|30|FRM|664|2|N
100040777887|C|CITIMORTGAGE, INC.|7|145000|360|01/2001|03/2001|71|71|2|20|752|N|P|SF|1|P|CA|953||FRM|786||N
100041189347|R|FLEET NATIONAL BANK|7.875|150000|240|10/2000|12/2000|85|85|2|34|734|N|C|SF|1|P|CO|810|6|FRM||1|N
100043437231|R|JPMORGAN CHASE BANK, NA|6.875|275000|240|12/2000|02/2001|71|71|1|41|765|N|P|PU|1|P|VA|201||FRM|||N

这是我使用的代码:

origSchema = StructType([
                         StructField("loan_id", StringType(), True),
                         StructField("origination_channel", StringType(), True),
                         StructField("seller_name", StringType(), True),
                         StructField("original_interest_rate", DoubleType(), True),
                         StructField("original_upb", DoubleType(), True),
                         StructField("original_loan_term", IntegerType(), True),
                         StructField("origination_date", DateType(), True),
                         StructField("first_payment_date", DateType(), True),
                         StructField("original_ltv", IntegerType(), True),
                         StructField("original_cltv", IntegerType(), True),
                         StructField("number_of_borrowers", IntegerType(), True),
                         StructField("original_dti", IntegerType(), True),
                         StructField("borrower_fico_at_origination", IntegerType(), True),
                         StructField("first_time_home_buyer_indicator", StringType(), True),
                         StructField("loan_purpose", StringType(), True),
                         StructField("property_type", StringType(), True),
                         StructField("number_of_units", StringType(), True),
                         StructField("occupancy_type", StringType(), True),
                         StructField("property_state", StringType(), True),
                         StructField("zip_code_short", StringType(), True),
                         StructField("primary_mortgage_insurance_percent", IntegerType(), True),
                         StructField("product_type", StringType(), True),
                         StructField("coborrower_fico_at_origination", IntegerType(), True),
                         StructField("mortgage_insurance_type", StringType(), True),
                         StructField("relocation_mortgage_indicator", StringType(), True)
])

df = spark.read.load("/mnt/mi-sa-armor/INPUT/FANNIEMAE/2001Q1/Acquisition_2001Q1.txt", format="csv", sep="|", header="false", schema=origSchema)

通过检查数据框:

+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
|loan_id|origination_channel|seller_name|original_interest_rate|original_upb|original_loan_term|origination_date|first_payment_date|original_ltv|original_cltv|number_of_borrowers|original_dti|borrower_fico_at_origination|first_time_home_buyer_indicator|loan_purpose|property_type|number_of_units|occupancy_type|property_state|zip_code_short|primary_mortgage_insurance_percent|product_type|coborrower_fico_at_origination|mortgage_insurance_type|relocation_mortgage_indicator|
+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
|   null|               null|       null|                  null|        null|              null|            null|              null|        null|         null|               null|        null|                        null|                           null|        null|         null|           null|          null|          null|          null|                              null|        null|                          null|                   null|                         null|
+-------+-------------------+-----------+----------------------+------------+------------------+----------------+------------------+------------+-------------+-------------------+------------+----------------------------+-------------------------------+------------+-------------+---------------+--------------+--------------+--------------+----------------------------------+------------+------------------------------+-----------------------+-----------------------------+
only showing top 5 rows

似乎我没有正确阅读它们。

但是如果我删除了模式选项,那很好,但是由于原始数据没有标题,因此数据框看起来不符合预期。

df = spark.read.load("/mnt/mi-sa-armor/INPUT/FANNIEMAE/2001Q1/Acquisition_2001Q1.txt", format="csv", sep="|", header="false")
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|         _c0|_c1|                 _c2|  _c3|   _c4|_c5|    _c6|    _c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|100005072756|  R|               OTHER|    8|125000|360|11/2000|01/2001| 95| 95|   1|  29| 694|   N|   P|  SF|   1|   P|  MI| 492|  30| FRM|null|   1|   N|
|100006905529|  C|  FLAGSTAR BANK, FSB|7.875| 99000|360|12/2000|02/2001| 80| 80|   1|  55| 689|   N|   C|  SF|   1|   P|  CA| 923|null| FRM|null|null|   N|
|100010275736|  R|JPMORGAN CHASE BA...|7.625|140000|360|12/2000|02/2001| 21| 21|   1|  12| 796|   N|   R|  SF|   1|   P|  CO| 803|null| FRM|null|null|   N|
|100015654434|  B|  FLAGSTAR BANK, FSB|7.625|102000|360|03/2001|05/2001| 80| 80|   2|  32| 706|   N|   C|  SF|   1|   P|  WI| 541|null| FRM| 661|null|   N|
|100020827204|  R|               OTHER|7.125|214000|360|01/2001|03/2001| 80| 80|   1|  43| 731|   N|   P|  SF|   4|   P|  IL| 604|null| FRM|null|null|   N|
+------------+---+--------------------+-----+------+---+-------+-------+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
only showing top 5 rows

您能帮我麻烦解决一下我的代码,为什么使用自定义架构时它无法正确加载数据吗?

非常感谢。

apache-spark apache-spark-sql pyspark-sql
2个回答
0
投票

强制实施架构时,spark将为不基于给定架构的记录返回空值。

spark csv中有3种解析文件的模式:

默认情况下为PERMISSIVE。可能的值为:

  1. PERMISSIVE:尝试解析所有行:为缺少而插入空值标记和多余的标记将被忽略。如果记录格式错误,则整个记录将设置为空值。例如数据类型或格式不匹配。

  2. DROPMALFORMED:删除那些令牌少于或多于预期或令牌不匹配模式。

  3. FAILFAST:如果格式错误,则中止并带有RuntimeException遇到线。

您没有设置任何模式,因此获得默认行为。

如果您有兴趣正确显示列名,然后将架构中的所有列设置为StringType。

[一旦看到数据看起来正确,然后将其格式化(例如,日期列),并在需要时进行投射。


0
投票

是您使用日期格式的列将其杀死。将其设置为StringType,然后在获得数据框后对其进行修复。没有更多信息,它将无法解析您的日期。

from pyspark.sql.types import *
origSchema = StructType([
                         StructField("loan_id", StringType(), True),
                         StructField("origination_channel", StringType(), True),
                         StructField("seller_name", StringType(), True),
                         StructField("original_interest_rate", DoubleType(), True),
                         StructField("original_upb", DoubleType(), True),
                         StructField("original_loan_term", IntegerType(), True),
                         StructField("origination_date", StringType(), True),
                         StructField("first_payment_date", StringType(), True),
                         StructField("original_ltv", IntegerType(), True),
                         StructField("original_cltv", IntegerType(), True),
                         StructField("number_of_borrowers", IntegerType(), True),
                         StructField("original_dti", IntegerType(), True),
                         StructField("borrower_fico_at_origination", IntegerType(), True),
                         StructField("first_time_home_buyer_indicator", StringType(), True),
                         StructField("loan_purpose", StringType(), True),
                         StructField("property_type", StringType(), True),
                         StructField("number_of_units", StringType(), True),
                         StructField("occupancy_type", StringType(), True),
                         StructField("property_state", StringType(), True),
                         StructField("zip_code_short", StringType(), True),
                         StructField("primary_mortgage_insurance_percent", IntegerType(), True),
                         StructField("product_type", StringType(), True),
                         StructField("coborrower_fico_at_origination", IntegerType(), True),
                         StructField("mortgage_insurance_type", StringType(), True),
                         StructField("relocation_mortgage_indicator", StringType(), True)
])

df = spark.read.option("inferSchema", True).option("sep", "|").format("csv").load("/path/to/data/name_of_file.txt")
© www.soinside.com 2019 - 2024. All rights reserved.