我的目标是每天每隔15分钟显示一次数据(由csv文件提供)。
我提出的解决方案是一个sql查询,它正在创建我需要的数据:
select
dateadd(minute, datediff(minute, 0, cast ([date] + ' ' + [time] as datetime2) ) / 15 * 15, 0) as dateInterval,
SecurityDesc,
StartPrice,
SUM(CAST(TradedVolume as decimal(18,2))) as totalTradedVolume,
SUM(cast(NumberOfTrades as int)) as totalNumberOfTrades,
ROW_NUMBER() over(PARTITION BY dateadd(minute, datediff(minute, 0, cast ([date] + ' ' + [time] as datetime) ) / 15 * 15, 0) ORDER BY Date) as rn
from MyTable
group by [date],[time],SecurityDesc,StartPrice
但是一旦我想在我的Spark python代码中使用它,它会抱怨datediff / dateadd,甚至会转换为datetime。
我理解它可能无法看到sql函数但我导入了:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as F
from datetime import datetime as d
from pyspark.sql.functions import datediff, to_date, lit
我该怎么做才能让它发挥作用?我更喜欢让我的查询工作,如果不是如何一般我可以在spark python中显示每15分钟的聚合数据?
更新:寻找获取数据的结果如下:
您已使用别名导入函数(我认为这是一个很好的做法):
import pyspark.sql.functions as F
这意味着您需要使用F
变量来使用F.to_date
等导入函数。您正在使用的函数是SQL查询函数,不属于pyspark.sql.functions中可用的实际函数(有关可用函数的列表,请参阅文档here)
要在Spark中解决您的问题,我将使用dataFrame,然后使用它来使用spark函数计算结果。
P.S下一次,最好发布实际的错误消息,而不是说火花“抱怨”;)