pyspark-java.lang.IllegalStateException:输入行没有架构所需的预期值

问题描述 投票:0回答:3

我在Horton沙箱上运行pyspark-sql代码

18/08/11 17:02:22 INFO kick.SparkContext:运行Spark版本1.6.3

# code 
from pyspark.sql import *
from pyspark.sql.types import *
rdd1 = sc.textFile ("/user/maria_dev/spark_data/products.csv")
rdd2 = rdd1.map( lambda x : x.split("," ) )
df1 = sqlContext.createDataFrame(rdd2, ["id","cat_id","name","desc","price", "url"])
df1.printSchema()

root
 |-- id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- price: string (nullable = true)
 |-- url: string (nullable = true)

df1.show() 
+---+------+--------------------+----+------+--------------------+
| id|cat_id|                name|desc| price|                 url|
+---+------+--------------------+----+------+--------------------+
|  1|     2|Quest Q64 10 FT. ...|    | 59.98|http://images.acm...|
|  2|     2|Under Armour Men'...|    |129.99|http://images.acm...|
|  3|     2|Under Armour Men'...|    | 89.99|http://images.acm...|
|  4|     2|Under Armour Men'...|    | 89.99|http://images.acm...|
|  5|     2|Riddell Youth Rev...|    |199.99|http://images.acm...|

# When I try to get counts I get the following error.
df1.count()

**Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.**

# I get the same error for the following code as well
df1.registerTempTable("products_tab")
df_query = sqlContext.sql ("select id, name, desc from products_tab order by name, id ").show();

我看到列desc为null,不确定在创建数据框和使用任何方法时是否需要以不同方式处理null列。

运行SQL查询时出现相同的错误。似乎sql错误是由于“order by”子句,如果我删除顺序然后查询运行成功。

如果您需要更多信息,请与我们联系,并了解如何处理此错误。

我试图查看名称字段是否包含Chandan Ray建议的任何逗号。名称字段中没有逗号。

rdd1.count()
=> 1345
rdd2.count()
=> 1345
# clipping id and name column from rdd2
rdd_name = rdd2.map(lambda x: (x[0], x[2]) )
rdd_name.count()
=>1345
rdd_name_comma = rdd_name.filter (lambda x : True if x[1].find(",") != -1  else False )
rdd_name_comma.count()
==> 0
apache-spark pyspark-sql hortonworks-data-platform
3个回答
0
投票

我想你的名字字段中有逗号,所以它也分裂了。所以它期待7列

可能存在一些格格不入的线条。

请尝试使用以下代码将错误记录排除在一个文件中

val df = spark.read.format(“csv”).option("badRecordsPath", "/tmp/badRecordsPath").load(“csvpath”)

//它将读取csv并创建一个数据帧,如果有任何格式错误的记录,它会将其移动到您提供的路径中。

//请阅读以下内容

https://docs.databricks.com/spark/latest/spark-sql/handling-bad-records.html


0
投票

我找到了问题 - 这是由于一个错误的记录,其中逗号嵌入在字符串中。即使字符串是双引号,python将字符串拆分为2列。我尝试使用databricks包

# from command prompt
pyspark --packages com.databricks:spark-csv_2.10:1.4.0

# on pyspark 
 schema1 = StructType ([ StructField("id",IntegerType(), True), \
         StructField("cat_id",IntegerType(), True), \
         StructField("name",StringType(), True),\
         StructField("desc",StringType(), True),\
         StructField("price",DecimalType(), True), \
         StructField("url",StringType(), True)
         ])

df1 = sqlContext.read.format('com.databricks.spark.csv').schema(schema1).load('/user/maria_dev/spark_data/products.csv')
        df1.show()
df1.show()
    +---+------+--------------------+----+-----+--------------------+
    | id|cat_id|                name|desc|price|                 url|
    +---+------+--------------------+----+-----+--------------------+
    |  1|     2|Quest Q64 10 FT. ...|    |   60|http://images.acm...|
    |  2|     2|Under Armour Men'...|    |  130|http://images.acm...|
    |  3|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  4|     2|Under Armour Men'...|    |   90|http://images.acm...|
    |  5|     2|Riddell Youth Rev...|    |  200|http://images.acm...|

df1.printSchema()
    root
     |-- id: integer (nullable = true)
     |-- cat_id: integer (nullable = true)
     |-- name: string (nullable = true)
     |-- desc: string (nullable = true)
     |-- price: decimal(10,0) (nullable = true)
     |-- url: string (nullable = true)

df1.count()
     1345

0
投票

以下是我对清理此类记录的看法,我们通常遇到这样的情况:

一个。如果“,”是列上最好的分隔符,则不会查看创建文件时数据的异常。

以下是我对此案的解决方案:

解决方案a:在这种情况下,如果该记录是合格记录,我们希望将流程识别为数据清理的一部分。如果将其他记录路由到错误的文件/集合,则可以协调这些记录。

以下是我的数据集的结构(product_id,product_name,unit_price)

1,product-1,10
2,product-2,20
3,product,3,30

在上述情况下,产品3应该被读作产品-3,当产品注册时可能是一个错字。在这种情况下,以下样本将起作用。

>>> tf=open("C:/users/ip2134/pyspark_practice/test_file.txt")
>>> trec=tf.read().splitlines()
>>> for rec in trec:
...   if rec.count(",") == 2:
...      trec_clean.append(rec)
...   else:
...      trec_bad.append(rec)
...
>>> trec_clean
['1,product-1,10', '2,product-2,20']
>>> trec_bad
['3,product,3,30']
>>> trec
['1,product-1,10', '2,product-2,20','3,product,3,30']

处理此问题的另一种方法是尝试查看skipinitialspace = True是否可以解析列。

(参考文献:Python parse CSV ignoring comma with double-quotes

© www.soinside.com 2019 - 2024. All rights reserved.