如何在Pyspark中计算或管理流数据?

问题描述 投票:0回答:1

我想从流媒体数据中提取数据,然后发送到网页上。例如:我想计算流式数据中的TotalSales列的总和。我将计算流数据中的TotalSales列的总和。 但它在以下地方出错 summary = dataStream.select('TotalSales').groupby().sum().toPandas() 这是我的代码。

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")
INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"
dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)
query = dataStream.writeStream.format("console").start()

summary = dataStream.select('TotalSales').groupby().sum().toPandas()
print(query.id)
query.awaitTermination();

这是命令行上显示的错误。

Traceback (most recent call last):
  File "testStreaming.py", line 12, in <module>
    dataStream = dataStream.toPandas()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 2150, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[C:/Users/HP/Desktop/test/jsonFile]'

谢谢你的回答。

python pyspark apache-spark-sql spark-streaming pyspark-dataframes
1个回答
0
投票

为什么你要创建一个Pandas Df?

toPandas将创建一个DataFrame,它是你的驱动节点的本地数据。 我不知道你想在这里实现什么。 Pandas DataFrame代表一组固定的tuples,而结构化流则是一个连续的数据流。

现在一个可能的解决这个问题的办法是完成你要做的整个过程,并将输出发送到一个parquetcsv文件,并使用这个parquetcsv等文件来创建一个pandas DF。

summary = dataStream.select('TotalSales').groupby().sum()
query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)
query.awaitTermination()
© www.soinside.com 2019 - 2024. All rights reserved.