时间序列/刻度数据集的火花转换

问题描述 投票:1回答:1

我们在配置单元中有一个表,该表存储每个交易日结束时的交易订单数据作为order_date。其他重要的列是产品合约price(所下订单的价格),ttime(交易时间)状态(插入,更新或删除)价格(订单价格)

我们必须从主表以滴答数据的方式构建一个图表表,从市场开盘的早晨到那时,每行(订单)的最大和最小价格定单。即对于给定的订单,我们将有4列填充为maxPrice(到目前为止的最高价格),maxpriceOrderId(最高价格的订单编号),minPrice和minPriceOrderId这必须是针对每种产品的合同,即该产品合同中的最高和最低价格。

计算这些值时,我们需要排除所有未结订单从聚合。即,到目前为止,所有订单价格的最高和最低价格(不包括状态为“删除”的订单)

我们正在使用:Spark 2.2,输入数据格式为镶木地板。输入记录enter image description here

输出记录

enter image description here

提供简单的SQL视图-该问题通过自连接解决,看起来像这样:在ttime上设置订购数据后,我们必须获取特定产品的最高和最低价格,从早上到该订单时间的每一行(订单)都必须签订合同。这将批量处理每个eod(order_date)数据集:

select mainSet.order_id,    mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.status,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as max_price,
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price desc,aggSet.ttime desc ) as maxOrderId
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as min_price as min_price
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price ,aggSet.ttime) as minOrderId
from order_table mainSet 
join order_table aggSet
ON (mainSet.produuct=aggSet.product,
mainSet.contract=aggSet.contract,
mainSet.ttime>=aggSet.ttime,
aggSet.status <> 'Remove')

Sparking in Spark

我们从如下所示的spark sql开始:

val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")

  val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
        (col("mainSet.product")===col("aggSet.product")
          && col("mainSet.contract")===col("aggSet.contract")
          && col("mainSet.ttime")>= col("aggSet.ttime")
          && col("aggSet.status") <> "Remove")
        ,"inner")
        .select(mainSet.order_id,mainSet.ttime,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.status,aggSet.order_id as agg_orderid,aggSet.ttime as agg_ttime,price as agg_price) //Renaming of columns

  val max_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val min_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
  val maxPriceCol = max(col("agg_price")).over(max_window)
  val minPriceCol = min(col("agg_price")).over(min_window)
  val firstMaxorder = first_value(col("agg_orderid")).over(max_window.orderBy(col("agg_price").desc, col("agg_ttime").desc))
  val firstMinorder = first_value(col("agg_orderid")).over(min_window.orderBy(col("agg_price"), col("agg_ttime")))      


  val priceDF=  ndf.withColumn("max_price",maxPriceCol)
                    .withColumn("maxOrderId",firstMaxorder)
                    .withColumn("min_price",minPriceCol)
                    .withColumn("minOrderId",firstMinorder)

    priceDF.show(20)

音量统计:

平均计数700万条记录每个组(产品,合同)的平均计数= 600K

该作业运行了几个小时,但还没有完成。我曾尝试增加内存和其他参数,但是没有运气。作业卡住了,很多时候我遇到内存问题Container killed by YARN for exceeding memory limits. 4.9 GB of 4.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead

另一种方法

对最下面的组列(产品和合同)进行重新分区,然后按时在分区内进行排序,以便我们按时收到mapPartition函数的每一行。

执行mappartition,同时在分区级别维护一个集合(键为order_id,价格为值),以计算最高价和最低价及其订单号。

我们将在收到订单时继续从收集中删除状态为“删除”的订单。一旦为mapparition中的给定行更新了集合,我们就可以从集合中计算最大值和最小值,并返回更新后的行。

val mainDF: DataFrame= sparkSession.sql("select order_id,product,contract,order_date,price,status,null as maxPrice,null as maxPriceOrderId,null as minPrice,null as minPriceOrderId from order_table where order_date ='eod_date' ").repartitionByRange(col("product"),col("contract"))

case class summary(order_id:String ,ttime:string,product:String,contract :String,order_date:String,price:BigDecimal,status :String,var maxPrice:BigDecimal,var maxPriceOrderId:String ,var minPrice:BigDecimal,var minPriceOrderId String)

val summaryEncoder = Encoders.product[summary]
val priceDF= mainDF.as[summary](summaryEncoder).sortWithinPartitions(col("ttime")).mapPartitions( iter => {
    //collection at partition level
    //key as order_id and value as price
    var priceCollection = Map[String, BigDecimal]()

    iter.map( row => {
        val orderId= row.order_id
        val rowprice= row.price

        priceCollection = row.status match {
                            case "Remove" => if (priceCollection.contains(orderId)) priceCollection -= orderId
                            case _ => priceCollection += (orderId -> rowPrice)
                         }

        row.maxPrice = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._2  // Gives key,value tuple from collectin for  max value )
        row.maxPriceOrderId = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._1

        row.minPrice =  if(priceCollection.size > 0) priceCollection.minBy(_._2)._2   // Gives key,value tuple from collectin for  min value )
        row.minPriceOrderId = if(priceCollection.size > 0) priceCollection.minBy(_._2)._1

      row

    })
  }).show(20)

对于较小的数据集,它运行良好,并且在20分钟内完成了该操作,但是我发现对于23家工厂记录(具有17个差异产品和合同),结果似乎不正确。我可以看到来自mappartition的一个分区(输入分割)的数据正在进入另一个分区,从而弄乱了值。

-> 我们是否可以保证在这种情况下,每个mappartition任务都将在此处获取功能键(产品和合同)的所有数据。。据我所知,mappartition在每个spark分区上执行功能(类似于map reduce中的输入拆分),因此如何强制spark创建具有该产品和合同组所有值的inputsplit / partitions。

-> 是否有其他解决此问题的方法

非常感谢我们在这里的帮助。

dataframe apache-spark apache-spark-sql time-series apache-spark-dataset
1个回答
0
投票

您用来对数据进行重新分区的方法repartitionByRange对这些列表达式上的数据进行了分区,但是进行了范围分区。您想要的是对这些列进行哈希分区。

将方法更改为repartition并将这些列传递给它,它应确保相同的值组最终在一个分区中。

© www.soinside.com 2019 - 2024. All rights reserved.