我在Horton沙箱上运行pyspark-sql代码
18/08/11 17:02:22 INFO kick.SparkContext:运行Spark版本1.6.3
# code
from pyspark.sql import *
from pyspark.sql.types import *
rdd1 = sc.textFile ("/user/maria_dev/spark_data/products.csv")
rdd2 = rdd1.map( lambda x : x.split("," ) )
df1 = sqlContext.createDataFrame(rdd2, ["id","cat_id","name","desc","price", "url"])
df1.printSchema()
root
|-- id: string (nullable = true)
|-- cat_id: string (nullable = true)
|-- name: string (nullable = true)
|-- desc: string (nullable = true)
|-- price: string (nullable = true)
|-- url: string (nullable = true)
df1.show()
+---+------+--------------------+----+------+--------------------+
| id|cat_id| name|desc| price| url|
+---+------+--------------------+----+------+--------------------+
| 1| 2|Quest Q64 10 FT. ...| | 59.98|http://images.acm...|
| 2| 2|Under Armour Men'...| |129.99|http://images.acm...|
| 3| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 4| 2|Under Armour Men'...| | 89.99|http://images.acm...|
| 5| 2|Riddell Youth Rev...| |199.99|http://images.acm...|
# When I try to get counts I get the following error.
df1.count()
**Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 6 fields are required while 7 values are provided.**
# I get the same error for the following code as well
df1.registerTempTable("products_tab")
df_query = sqlContext.sql ("select id, name, desc from products_tab order by name, id ").show();
我看到列desc为null,不确定在创建数据框和使用任何方法时是否需要以不同方式处理null列。
运行SQL查询时出现相同的错误。似乎sql错误是由于“order by”子句,如果我删除顺序然后查询运行成功。
如果您需要更多信息,请与我们联系,并了解如何处理此错误。
我试图查看名称字段是否包含Chandan Ray建议的任何逗号。名称字段中没有逗号。
rdd1.count()
=> 1345
rdd2.count()
=> 1345
# clipping id and name column from rdd2
rdd_name = rdd2.map(lambda x: (x[0], x[2]) )
rdd_name.count()
=>1345
rdd_name_comma = rdd_name.filter (lambda x : True if x[1].find(",") != -1 else False )
rdd_name_comma.count()
==> 0
我想你的名字字段中有逗号,所以它也分裂了。所以它期待7列
可能存在一些格格不入的线条。
请尝试使用以下代码将错误记录排除在一个文件中
val df = spark.read.format(“csv”).option("badRecordsPath", "/tmp/badRecordsPath").load(“csvpath”)
//它将读取csv并创建一个数据帧,如果有任何格式错误的记录,它会将其移动到您提供的路径中。
//请阅读以下内容
https://docs.databricks.com/spark/latest/spark-sql/handling-bad-records.html
我找到了问题 - 这是由于一个错误的记录,其中逗号嵌入在字符串中。即使字符串是双引号,python将字符串拆分为2列。我尝试使用databricks包
# from command prompt
pyspark --packages com.databricks:spark-csv_2.10:1.4.0
# on pyspark
schema1 = StructType ([ StructField("id",IntegerType(), True), \
StructField("cat_id",IntegerType(), True), \
StructField("name",StringType(), True),\
StructField("desc",StringType(), True),\
StructField("price",DecimalType(), True), \
StructField("url",StringType(), True)
])
df1 = sqlContext.read.format('com.databricks.spark.csv').schema(schema1).load('/user/maria_dev/spark_data/products.csv')
df1.show()
df1.show()
+---+------+--------------------+----+-----+--------------------+
| id|cat_id| name|desc|price| url|
+---+------+--------------------+----+-----+--------------------+
| 1| 2|Quest Q64 10 FT. ...| | 60|http://images.acm...|
| 2| 2|Under Armour Men'...| | 130|http://images.acm...|
| 3| 2|Under Armour Men'...| | 90|http://images.acm...|
| 4| 2|Under Armour Men'...| | 90|http://images.acm...|
| 5| 2|Riddell Youth Rev...| | 200|http://images.acm...|
df1.printSchema()
root
|-- id: integer (nullable = true)
|-- cat_id: integer (nullable = true)
|-- name: string (nullable = true)
|-- desc: string (nullable = true)
|-- price: decimal(10,0) (nullable = true)
|-- url: string (nullable = true)
df1.count()
1345
以下是我对清理此类记录的看法,我们通常遇到这样的情况:
一个。如果“,”是列上最好的分隔符,则不会查看创建文件时数据的异常。
以下是我对此案的解决方案:
解决方案a:在这种情况下,如果该记录是合格记录,我们希望将流程识别为数据清理的一部分。如果将其他记录路由到错误的文件/集合,则可以协调这些记录。
以下是我的数据集的结构(product_id,product_name,unit_price)
1,product-1,10
2,product-2,20
3,product,3,30
在上述情况下,产品3应该被读作产品-3,当产品注册时可能是一个错字。在这种情况下,以下样本将起作用。
>>> tf=open("C:/users/ip2134/pyspark_practice/test_file.txt")
>>> trec=tf.read().splitlines()
>>> for rec in trec:
... if rec.count(",") == 2:
... trec_clean.append(rec)
... else:
... trec_bad.append(rec)
...
>>> trec_clean
['1,product-1,10', '2,product-2,20']
>>> trec_bad
['3,product,3,30']
>>> trec
['1,product-1,10', '2,product-2,20','3,product,3,30']
处理此问题的另一种方法是尝试查看skipinitialspace = True是否可以解析列。