如何在结构化流传输中将数据帧转换为rdds?

问题描述 投票:1回答:2

[我使用pyspark流从kafka获取数据,结果是一个数据帧,当我将数据帧转换为rdd时,它出错了:

Traceback (most recent call last):
File "/home/docs/dp_model/dp_algo_platform/dp_algo_core/test/test.py", line 36, in <module>
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 91, in rdd
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

正确的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df = df.withColumn("s", F.split(df['value'], " "))
df = df.withColumn('e', F.explode(df['s']))
# df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

这是错误的版本代码:

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test") \
    .load()

df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# df = df.withColumn("s", F.split(df['value'], " "))
# df = df.withColumn('e', F.explode(df['s']))
df = df.rdd.map(lambda x: x.value.split(" ")).toDF()

q = df.writeStream \
    .format("console") \
    .trigger(processingTime='30 seconds') \
    .start()

q.awaitTermination()

为什么它不能将数据帧转换为rdd?以及如何在pyspark流中将数据帧转换为rdd时该怎么办?

apache-spark spark-streaming
2个回答
1
投票

此RDD方面完全不受支持。 RDD是传统的,Spark结构化流基于DF / DS。通用抽象,无论是流式还是批处理。


0
投票

结构化流在spark-sql引擎上运行。不支持将数据帧或数据集转换为RDD。

© www.soinside.com 2019 - 2024. All rights reserved.