处理每一行以获取日期

问题描述 投票:-2回答:2

我有一个包含year和mon01,mon02的文件

enter image description here

使用列名中的后两个字符提取月份(即-MON01中的01)各个月份(MON01,MON02 ..)中文本值的长度与月份中的天数相同。每1个检索日期的位置。

例如-2018年1月2日(02天因为1在第二天发生)2018-01-032018-01-07

我们如何在spark-scala中获得结果?

scala apache-spark apache-spark-sql rdd apache-spark-dataset
2个回答
1
投票
    UDF:
    def myudf =(month:String,year:String ,value:String ) => {
        val month1 = month.replaceAll("[A-Za-z]+","")
        var date=1
        val dateList = ListBuffer[String]()
        for(char<-value){ 
          if(char=='1'){
           dateList += year+"-"+month1+"-"+date
          } else {
            dateList += ""
          }
          date += 1
        }
        dateList.filter(_.nonEmpty)
      }
    //Main Method
    val data = spark.read.option("header", "true").csv("data.csv")
    data.show()
    +----+-----+-----+
    |Year|Mon01|Mon02|
    +----+-----+-----+
    |2018|01110|00111|
    |2019|01100|00001|
    +----+-----+-----+
    val myCostumeudf = udf(myudf)
    val monthCols = data.columns.drop(1)
    val requiredDF = monthCols.foldLeft(data){
        case (df, month) =>
          df.withColumn("Date_"+month, myCostumeudf(lit(month),data("Year"),data(month)))
      }
    requiredDF.show(false)
    +----+-----+-----+---------------------------------+---------------------------------+
    |Year|Mon01|Mon02|Date_Mon01                       |Date_Mon02                       |
    +----+-----+-----+---------------------------------+---------------------------------+
    |2018|01110|00111|[2018-01-2, 2018-01-3, 2018-01-4]|[2018-02-3, 2018-02-4, 2018-02-5]|
    |2019|01100|00001|[2019-01-2, 2019-01-3]           |[2019-02-5]                      |
    +----+-----+-----+---------------------------------+---------------------------------+

我希望这会对您有所帮助。


-1
投票
  1. 获取列表列

    val cols = df.columns.toListval res = cols.foldLeft(...)

    { val filterDF = colWhere.foldLeft(pDYF){(tmpDF, colName) => { val cn = s"${colName}_${colName}" val v: Option[String] = if (colName == "countries" || colName == "states" || colName == "zipCodes" || colName == "genders" || colName == "providers") { val vc = tmpDF.first().getAs[String](colName).asInstanceOf[mutable.WrappedArray[String]].map(x=>x).toArray val vc1: Option[String] = if(vc.length == 0) None else Some(vc.map(i=> s"$colName" + " = '" + i.toString + "'").mkString(" or ")) vc1 } else { val vc = tmpDF.first().getAs[Long](colName).asInstanceOf[mutable.WrappedArray[Long]].map(x=>x).toArray val vc1: Option[String] = if(vc.length == 0) None else Some(vc.map(i=> s"$colName" + " = " + i.toString).mkString(" or ")) vc1 } tmpDF.withColumn(cn, lit(v.getOrElse(""))) }}}
© www.soinside.com 2019 - 2024. All rights reserved.