Spark RDD类型的混淆

问题描述 投票:0回答:1

我只是学习Spark并从RDD开始,现在转到DataFrames。在我当前的pyspark项目中,我正在将S3文件读入RDD,并对它们进行一些简单的转换。这是代码。

segmentsRDD = sc.textFile(fileLocation). \
    filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
    filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
    filter(lambda line: "null" not in line). \
    map(splitComma). \
    filter(lambda line: line.split(",")[5] == '1')

SplitComma是一个对行数据进行一些日期计算并返回10个逗号分隔的字段的函数。一旦知道,我将运行最后一个过滤器,如图所示,仅对字段[5] = 1中的值的拾取行进行操作。到目前为止,一切都很好。

[接下来,我想使用如下所示的模式将segmentRDD转换为DF。

interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")

但是我收到有关无法将“ pyspark.rdd.PipelinedRDD”转换为DataFrame的错误。您能解释一下“ pyspark.rdd.PipelinedRDD”和“ row RDD”之间的区别吗?我正在尝试使用所示的架构转换为DF。我在这里想念什么?

谢谢

pyspark apache-spark-sql rdd
1个回答
0
投票

您必须在代码中添加以下行:

from pyspark.sql import SparkSession
spark = SparkSession(sc)

方法.toDF()不是rdd的方法。通过SparkSession初始化,当您运行rdd.toDF()时,您将直接运行数据框方法.toDF()

Take a look in the Spark source code

© www.soinside.com 2019 - 2024. All rights reserved.