我需要将一段Scala代码转换为python代码,这段Scala代码将字符串的RDD转换为case-class的RDD。代码如下。
case class Stock(
stockName: String,
dt: String,
openPrice: Double,
highPrice: Double,
lowPrice: Double,
closePrice: Double,
adjClosePrice: Double,
volume: Double
)
def parseStock(inputRecord: String, stockName: String): Stock = {
val coloumn = inputRecord.split(",")
Stock(
stockName,
coloumn(0),
coloumn(1).toDouble,
coloumn(2).toDouble,
coloumn(3).toDouble,
coloumn(4).toDouble,
coloumn(5).toDouble,
coloumn(6).toDouble)
}
def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
val header = rdd.first
rdd.filter((data) => {
data(0) != header(0) && !data.contains("null")
})
.map(data => parseStock(data, stockName))
}
可以在PySpark中实现吗?我试着使用下面的代码,但它给出了错误信息
from dataclasses import dataclass
@dataclass(eq=True,frozen=True)
class Stock:
stockName : str
dt: str
openPrice: float
highPrice: float
lowPrice: float
closePrice: float
adjClosePrice: float
volume: float
def parseStock(inputRecord, stockName):
coloumn = inputRecord.split(",")
return Stock(stockName,
coloumn[0],
coloumn[1],
coloumn[2],
coloumn[3],
coloumn[4],
coloumn[5],
coloumn[6])
def parseRDD(rdd, stockName):
header = rdd.first()
res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
return res
错误Py4JJavaError: 在调用z:org.apache.spark.api.python.PythonRDD.collectAndServe.时发生错误:org.apache.spark.SparkException: 由于阶段性失败,作业中止。21.0阶段的任务0失败1次,最近一次失败。失去了21.0阶段的任务0.0(TID 31,localhost,执行者驱动):org.apache.spark.api.python.PythonException: 回溯 (最近一次调用)。
文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py",第364行,在main func, profiler, deserializer, serializer = read_command(pickleSer, infile) 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkworker.py",第69行,在read_command command = serializer. 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py",第173行,在_read_with_length中,返回self.com。 loads(obj) 文件 "contentspark-2.4.5-bin-hadoop2.7pythonlibpyspark.zippysparkserializers.py",第587行,在 loads中 return pickle.loads(obj, encoding=encoding)AttributeError: 无法获取属性''。主'上
Dataset API在python中是不可用的。
"Dataset "是一个分布式的数据集合。Dataset是Spark 1.6中新增的一个接口,它提供了RDDs的优点(强类型化,能够使用强大的lambda函数)和Spark SQL的优化执行引擎的优点。一个Dataset可以从JVM对象中构建,然后使用功能转换(map、flatMap、filter等)进行操作。Dataset API可以在Scala和Java中使用。Python没有对Dataset API的支持。但由于Python的动态特性,Dataset API的许多优点已经可以使用(即你可以通过名称自然地访问行的字段 row.columnName)。R的情况也类似。"