将 pandas 数据帧转换为 Spark 数据帧时收到错误

问题描述 投票:0回答:2

由于 Spark 中没有对读取 excel 文件的开箱即用支持,所以我首先将 excel 文件读入 pandas 数据帧,然后尝试将 pandas 数据帧转换为 Spark 数据帧,但出现以下错误 (我使用的是spark 1.5.1)

import pandas as pd
from pandas import ExcelFile
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

pdf=pd.read_excel('/home/testdata/test.xlsx')
df = sqlContext.createDataFrame(pdf)

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/spark/spark-hadoop/python/pyspark/sql/context.py", line 406, in createDataFrame
    rdd, schema = self._createFromLocal(data, schema)
  File "/opt/spark/spark-hadoop/python/pyspark/sql/context.py", line 337, in _createFromLocal
    data = [schema.toInternal(row) for row in data]
  File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 541, in toInternal
    return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
  File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 541, in <genexpr>
    return tuple(f.toInternal(v) for f, v in zip(self.fields, obj))
  File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 435, in toInternal
    return self.dataType.toInternal(obj)
  File "/opt/spark/spark-hadoop/python/pyspark/sql/types.py", line 191, in toInternal
    else time.mktime(dt.timetuple()))
AttributeError: 'datetime.time' object has no attribute 'timetuple'

有人知道如何解决吗?

python pandas pyspark
2个回答
1
投票

我最好的猜测是,当您使用 Pandas 读取数据时,您的问题是“错误”解析

datetime
数据

以下代码“正常工作”:

import pandas as pd
from pandas import ExcelFile
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

pdf = pd.read_excel('test.xlsx', parse_dates=['Created on','Confirmation time'])

sc = SparkContext()
sqlContext = SQLContext(sc)

sqlContext.createDataFrame(data=pdf).collect()

[Row(Customer=1000935702, Country='TW',  ...

请注意,您还有一个日期时间列

'Confirmation date'
,在您的示例中由
NaT
组成,因此使用您的短样本读取 RDD 不会出现问题,但是如果您碰巧在完整数据集中有一些数据,则您'我也必须关心该专栏。


0
投票

显式定义模式可以解决问题。根据您的用例,您可以动态指定架构,如下面的代码片段所示;

from pyspark.sql.types import  *                                      
schema = StructType([
StructField(name, 
            TimestampType() if pd.api.types.is_datetime64_dtype(col) else
            DateType() if pd.api.types.is_datetime64_any_dtype(col) else
            DoubleType() if pd.api.types.is_float_dtype(col) else StringType(), True)
for name, col in zip(df.columns, df.dtypes)])                        
sparkDf = spark.createDataFrame(df, schema)
© www.soinside.com 2019 - 2024. All rights reserved.