如何使用以月和unix纪元列给定的比率将spark数据帧分割成2个?

问题描述 投票:0回答:2

我想使用月和unix纪元列给出的比率将spark数据框分割成2个。

数据框示例如下

unixepoch
---------
1539754800
1539754800
1539931200
1539927600
1539927600
1539931200
1539931200
1539931200
1539927600
1540014000
1540014000
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400
1540190400

分尸策略

如果给定的数据总月数是说30个月,分割比是说0.6,那么预期数据框1应该有:30*0.6=18个月的数据,预期数据框1应该有:。30 * 0. 6 = 18个月的数据,预期数据框1应该有: 30*0.4=12个月的数据

EDIT-1

大部分的答案都是通过考虑记录数的分割率来给出的,即如果总记录数=100,分割率=0.6,那么分割1DF~=60条记录,分割2DF~=40条记录。这里的分割率是给定的月份,可以通过给定的epoch unix时间戳列从上面的样本数据框架计算.假设上面的epoch列是30个月的一些distibution,那么我想第一个18个月的epoch在数据框架1和最后12个月的epoch行在第二个数据框架.你可以认为这是分割数据框架的时间序列数据在火花。

EDIT-2

如果给出的数据是2018年7月到2019年5月=10个月的数据,那么split1(0.6=前6个月)=(2018年7月,2019年1月)和split2(0.4=最后4个月)=(2019年2月,2019年5月).随机选取不应该存在。

scala apache-spark apache-spark-sql apache-spark-ml
2个回答
2
投票

使用 row_number &amp.用于将数据分割成两个DataFrame。filter 来将数据分割成两个DataFrame。

scala> val totalMonths = 10
totalMonths: Int = 10

scala> val splitRatio = 0.6
splitRatio: Double = 0.6

scala> val condition = (totalMonths * splitRatio).floor + 1
condition: Double = 7.0

scala> epochDF.show(false)
+----------+-----+
|dt        |month|
+----------+-----+
|1530383400|7    |
|1533061800|8    |
|1535740200|9    |
|1538332200|10   |
|1541010600|11   |
|1543602600|12   |
|1546281000|1    |
|1548959400|2    |
|1551378600|3    |
|1554057000|4    |
|1556649000|5    |
+----------+-----+

scala> import org.apache.spark.sql.expressions._
import org.apache.spark.sql.expressions._

scala> epochDF.orderBy($"dt".asc).withColumn("id",row_number().over(Window.orderBy($"dt".asc))).filter($"id" <= condition).show(false)

+----------+-----+---+
|dt        |month|id |
+----------+-----+---+
|2018-07-01|7    |1  |
|2018-08-01|8    |2  |
|2018-09-01|9    |3  |
|2018-10-01|10   |4  |
|2018-11-01|11   |5  |
|2018-12-01|12   |6  |
|2019-01-01|1    |7  |
+----------+-----+---+

scala> epochDF.orderBy($"dt".asc).withColumn("id",row_number().over(Window.orderBy($"dt".asc))).filter($"id" > condition).show(false)

+----------+-----+---+
|dt        |month|id |
+----------+-----+---+
|2019-02-01|2    |8  |
|2019-03-01|3    |9  |
|2019-04-01|4    |10 |
|2019-05-01|5    |11 |
+----------+-----+---+



-2
投票

我是根据月份来划分数据的,如果给出的是1个月的数据,那么就按天来划分。

我比较喜欢这种方法,因为这个答案不依赖于窗口函数。这里给出的其他答案是使用 Window without partitionBy ,随着数据洗牌到一个执行器,严重降低了性能。

1.拆分法给定火车比,以月为单位。

 val EPOCH = "epoch"
    def splitTrainTest(inputDF: DataFrame,
                       trainRatio: Double): (DataFrame, DataFrame) = {
      require(trainRatio >= 0 && trainRatio <= 0.9, s"trainRatio must between 0 and 0.9, found : $trainRatio")

      def extractDateCols(tuples: (String, Column)*): DataFrame = {
        tuples.foldLeft(inputDF) {
          case (df, (dateColPrefix, dateColumn)) =>
            df
              .withColumn(s"${dateColPrefix}_month", month(from_unixtime(dateColumn))) // month
              .withColumn(s"${dateColPrefix}_dayofmonth", dayofmonth(from_unixtime(dateColumn))) // dayofmonth
              .withColumn(s"${dateColPrefix}_year", year(from_unixtime(dateColumn))) // year
        }
      }

      val extractDF = extractDateCols((EPOCH, inputDF(EPOCH)))

      // derive min/max(yyyy-MM)
      val yearCol = s"${EPOCH}_year"
      val monthCol = s"${EPOCH}_month"
      val dayCol = s"${EPOCH}_dayofmonth"
      val SPLIT = "split"
      require(trainRatio >= 0 && trainRatio <= 0.9, s"trainRatio must between 0 and 0.9, found : $trainRatio")

      // derive min/max(yyyy-MM)
      //    val yearCol = PLANNED_START_YEAR
      //    val monthCol = PLANNED_START_MONTH
      val dateCol = to_date(date_format(
        concat_ws("-", Seq(yearCol, monthCol).map(col): _*), "yyyy-MM-01"))

      val minMaxDF = extractDF.agg(max(dateCol).as("max_date"), min(dateCol).as("min_date"))
      val min_max_date = minMaxDF.head()
      import java.sql.{Date => SqlDate}
      val minDate = min_max_date.getAs[SqlDate]("min_date")
      val maxDate = min_max_date.getAs[SqlDate]("max_date")

      println(s"Min Date Found: $minDate")
      println(s"Max Date Found: $maxDate")

      // Get the total months for which the data exist
      val totalMonths = (maxDate.toLocalDate.getYear - minDate.toLocalDate.getYear) * 12 +
        maxDate.toLocalDate.getMonthValue - minDate.toLocalDate.getMonthValue
      println(s"Total Months of data found for is $totalMonths months")

      // difference starts with 0
      val splitDF = extractDF.withColumn(SPLIT, round(months_between(dateCol, to_date(lit(minDate)))).cast(DataTypes.IntegerType))

      val (trainDF, testDF) = totalMonths match {
        // data is provided for more than a month
        case tm if tm > 0 =>
          val trainMonths = Math.round(totalMonths * trainRatio)
          println(s"Data considered for training is < $trainMonths months")
          println(s"Data considered for testing is >= $trainMonths months")
          (splitDF.filter(col(SPLIT) < trainMonths), splitDF.filter(col(SPLIT) >= trainMonths))

        // data is provided for a month, split based on the total records  in terms of days
        case tm if tm == 0 =>
          //        val dayCol = PLANNED_START_DAYOFMONTH
          val splitDF1 = splitDF.withColumn(SPLIT,
            datediff(date_format(
              concat_ws("-", Seq(yearCol, monthCol, dayCol).map(col): _*), "yyyy-MM-dd"), lit(minDate))
          )
          // Get the total days for which the data exist
          val todalDays = splitDF1.select(max(SPLIT).as("total_days")).head.getAs[Int]("total_days")
          if (todalDays <= 1) {
            throw new RuntimeException(s"Insufficient data provided for training, Data found for $todalDays days but " +
              s"$todalDays > 1 required")
          }
          println(s"Total Days of data found is $todalDays days")

          val trainDays = Math.round(todalDays * trainRatio)
          (splitDF1.filter(col(SPLIT) < trainDays), splitDF1.filter(col(SPLIT) >= trainDays))

        // data should be there
        case default => throw new RuntimeException(s"Insufficient data provided for training, Data found for $totalMonths " +
          s"months but $totalMonths >= 1 required")
      }
      (trainDF.cache(), testDF.cache())
    }

2. 使用多年来多个月份的数据进行测试

 //  call methods
    val implicits = sqlContext.sparkSession.implicits
    import implicits._
    val monthData = sc.parallelize(Seq(
      1539754800,
      1539754800,
      1539931200,
      1539927600,
      1539927600,
      1539931200,
      1539931200,
      1539931200,
      1539927600,
      1540449600,
      1540449600,
      1540536000,
      1540536000,
      1540536000,
      1540424400,
      1540424400,
      1540618800,
      1540618800,
      1545979320,
      1546062120,
      1545892920,
      1545892920,
      1545892920,
      1545201720,
      1545892920,
      1545892920
    )).toDF(EPOCH)

    val (split1, split2) = splitTrainTest(monthData, 0.6)
    split1.show(false)
    split2.show(false)

    /**
      * Min Date Found: 2018-10-01
      * Max Date Found: 2018-12-01
      * Total Months of data found for is 2 months
      * Data considered for training is < 1 months
      * Data considered for testing is >= 1 months
      * +----------+-----------+----------------+----------+-----+
      * |epoch     |epoch_month|epoch_dayofmonth|epoch_year|split|
      * +----------+-----------+----------------+----------+-----+
      * |1539754800|10         |17              |2018      |0    |
      * |1539754800|10         |17              |2018      |0    |
      * |1539931200|10         |19              |2018      |0    |
      * |1539927600|10         |19              |2018      |0    |
      * |1539927600|10         |19              |2018      |0    |
      * |1539931200|10         |19              |2018      |0    |
      * |1539931200|10         |19              |2018      |0    |
      * |1539931200|10         |19              |2018      |0    |
      * |1539927600|10         |19              |2018      |0    |
      * |1540449600|10         |25              |2018      |0    |
      * |1540449600|10         |25              |2018      |0    |
      * |1540536000|10         |26              |2018      |0    |
      * |1540536000|10         |26              |2018      |0    |
      * |1540536000|10         |26              |2018      |0    |
      * |1540424400|10         |25              |2018      |0    |
      * |1540424400|10         |25              |2018      |0    |
      * |1540618800|10         |27              |2018      |0    |
      * |1540618800|10         |27              |2018      |0    |
      * +----------+-----------+----------------+----------+-----+
      *
      * +----------+-----------+----------------+----------+-----+
      * |epoch     |epoch_month|epoch_dayofmonth|epoch_year|split|
      * +----------+-----------+----------------+----------+-----+
      * |1545979320|12         |28              |2018      |2    |
      * |1546062120|12         |29              |2018      |2    |
      * |1545892920|12         |27              |2018      |2    |
      * |1545892920|12         |27              |2018      |2    |
      * |1545892920|12         |27              |2018      |2    |
      * |1545201720|12         |19              |2018      |2    |
      * |1545892920|12         |27              |2018      |2    |
      * |1545892920|12         |27              |2018      |2    |
      * +----------+-----------+----------------+----------+-----+
      */

3. 使用一年中一个月的数据进行测试

 val oneMonthData = sc.parallelize(Seq(
      1589514575, //  Friday, May 15, 2020 3:49:35 AM
      1589600975, // Saturday, May 16, 2020 3:49:35 AM
      1589946575, // Wednesday, May 20, 2020 3:49:35 AM
      1590378575, // Monday, May 25, 2020 3:49:35 AM
      1590464975, // Tuesday, May 26, 2020 3:49:35 AM
      1590470135 // Tuesday, May 26, 2020 5:15:35 AM
    )).toDF(EPOCH)

    val (split3, split4) = splitTrainTest(oneMonthData, 0.6)
    split3.show(false)
    split4.show(false)

    /**
      * Min Date Found: 2020-05-01
      * Max Date Found: 2020-05-01
      * Total Months of data found for is 0 months
      * Total Days of data found is 25 days
      * +----------+-----------+----------------+----------+-----+
      * |epoch     |epoch_month|epoch_dayofmonth|epoch_year|split|
      * +----------+-----------+----------------+----------+-----+
      * |1589514575|5          |15              |2020      |14   |
      * +----------+-----------+----------------+----------+-----+
      *
      * +----------+-----------+----------------+----------+-----+
      * |epoch     |epoch_month|epoch_dayofmonth|epoch_year|split|
      * +----------+-----------+----------------+----------+-----+
      * |1589600975|5          |16              |2020      |15   |
      * |1589946575|5          |20              |2020      |19   |
      * |1590378575|5          |25              |2020      |24   |
      * |1590464975|5          |26              |2020      |25   |
      * |1590470135|5          |26              |2020      |25   |
      * +----------+-----------+----------------+----------+-----+
      */
© www.soinside.com 2019 - 2024. All rights reserved.