我只是学习Spark并从RDD开始,现在转到DataFrames。在我当前的pyspark项目中,我正在将S3文件读入RDD,并对它们进行一些简单的转换。这是代码。
segmentsRDD = sc.textFile(fileLocation). \
filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
filter(lambda line: "null" not in line). \
map(splitComma). \
filter(lambda line: line.split(",")[5] == '1')
SplitComma是一个对行数据进行一些日期计算并返回10个逗号分隔的字段的函数。一旦知道,我将运行最后一个过滤器,如图所示,仅对字段[5] = 1中的值的拾取行进行操作。到目前为止,一切都很好。
[接下来,我想使用如下所示的模式将segmentRDD转换为DF。
interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")
但是我收到有关无法将“ pyspark.rdd.PipelinedRDD”转换为DataFrame的错误。您能解释一下“ pyspark.rdd.PipelinedRDD”和“ row RDD”之间的区别吗?我正在尝试使用所示的架构转换为DF。我在这里想念什么?
谢谢
您必须在代码中添加以下行:
from pyspark.sql import SparkSession
spark = SparkSession(sc)
方法.toDF()
不是rdd的方法。通过SparkSession初始化,当您运行rdd.toDF()
时,您将直接运行数据框方法.toDF()
。