在PySpark中创建Apache Spark类的RDD。

问题描述 投票:0回答:1

我需要将一段Scala代码转换为python代码,这段Scala代码将字符串的RDD转换为case-class的RDD。代码如下。

case class Stock(
                  stockName: String,
                  dt: String,
                  openPrice: Double,
                  highPrice: Double,
                  lowPrice: Double,
                  closePrice: Double,
                  adjClosePrice: Double,
                  volume: Double
                )


  def parseStock(inputRecord: String, stockName: String): Stock = {
    val coloumn = inputRecord.split(",")
    Stock(
      stockName,
      coloumn(0),
      coloumn(1).toDouble,
      coloumn(2).toDouble,
      coloumn(3).toDouble,
      coloumn(4).toDouble,
      coloumn(5).toDouble,
      coloumn(6).toDouble)
  }

  def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
    val header = rdd.first
    rdd.filter((data) => {
      data(0) != header(0) && !data.contains("null")
    })
      .map(data => parseStock(data, stockName))
  }  

可以在PySpark中实现吗?我试着使用下面的代码,但它给出了错误信息

from dataclasses import dataclass

@dataclass(eq=True,frozen=True)
class Stock:
    stockName : str
    dt: str
    openPrice: float
    highPrice: float
    lowPrice: float
    closePrice: float
    adjClosePrice: float
    volume: float




def parseStock(inputRecord, stockName):
  coloumn = inputRecord.split(",")
  return Stock(stockName,
               coloumn[0],
               coloumn[1],
               coloumn[2],
               coloumn[3],
               coloumn[4],
               coloumn[5],
               coloumn[6])

def parseRDD(rdd, stockName):
  header = rdd.first()
  res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
  return res

错误Py4JJavaError: 在调用z:org.apache.spark.api.python.PythonRDD.collectAndServe.时发生错误:org.apache.spark.SparkException: 由于阶段性失败,作业中止。21.0阶段的任务0失败1次,最近一次失败。失去了21.0阶段的任务0.0(TID 31,localhost,执行者驱动):org.apache.spark.api.python.PythonException: 回溯 (最近一次调用)。

文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py",第364行,在main func, profiler, deserializer, serializer = read_command(pickleSer, infile) 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py",第69行,在read_command command = serializer. 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py",第173行,在_read_with_length中,返回self.com。 loads(obj) 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py",第587行,在 loads中 return pickle.loads(obj, encoding=encoding)AttributeError: 无法获取属性''。'上

apache-spark pyspark rdd case-class python-dataclasses
1个回答
1
投票

Dataset API在python中是不可用的。

"Dataset "是一个分布式的数据集合。Dataset是Spark 1.6中新增的一个接口,它提供了RDDs的优点(强类型化,能够使用强大的lambda函数)和Spark SQL的优化执行引擎的优点。一个Dataset可以从JVM对象中构建,然后使用功能转换(map、flatMap、filter等)进行操作。Dataset API可以在Scala和Java中使用。Python没有对Dataset API的支持。但由于Python的动态特性,Dataset API的许多优点已经可以使用(即你可以通过名称自然地访问行的字段 row.columnName)。R的情况也类似。"

https:/spark.apache.orgdocslatestsql-编程指南.html

© www.soinside.com 2019 - 2024. All rights reserved.