spark scala dataframe - 数据类型转换问题

问题描述 投票:0回答:0

所有,scala 菜鸟问题: 遇到将字符串数据类型从数据帧转换为使用参数到 spark.read.text() 函数的问题

以下代码将文件目录读入数据框 df1,映射到临时表 flist df1 仅包含 fpath、modtime 纪元 df2 使用 spark.sql 将 modtime 纪元转换为日期-小时字符串 最后一部分是嵌套循环,外循环读取唯一的日期-小时字符串,查询 df2 以获取给定日期-小时字符串的文件名

但是,尝试使用 fpath 作为 spark.read.text(f) 的参数,由于 fpath 的数据类型为“Any”而失败,但需要字符串 (??)

问题是:我不确定如何让它接受 fpath (f) 参数 将不胜感激任何意见-非常感谢您花时间在这上面 先生

以下代码和错误:

// =======================
// get list of files in directory
// =======================

import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val srcpath=new Path("/data/gen1")
val files = fs.listStatus(srcpath)

// =======================
// Convert List of files to DataFrame
// =======================
// map files (array of hdfsfiles) to List
// convert List to RDD
// create schema    (NB other syntaxes for this possible)
// convert RDD to DataFrame
// =======================
// list of unique yyyymmdd-hh24
// =======================

import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rows = files.map(t => Row(t.getPath().toString,(t.getModificationTime/1000).toLong)).toList
val rdd = spark.sparkContext.parallelize(rows)
val schema = StructType( StructField("FPath", StringType, true)::StructField("FModTime", LongType, true)::Nil)
val df1 = spark.createDataFrame(rdd, schema)
print(df1.schema)
df1.createOrReplaceTempView("flist")

// =======================
// transform raw data df1 to df2 (same data, but modtime epoch changed to 'yyyymmdd-hh')
// =======================
// nested loop
// first: unique "dayhr" (yyyymmdd-hh)
// second: files for each dayhr
// =======================

val df2 = spark.sql("select fpath,date_format(from_unixtime(fmodtime),'yyyyMMdd-hh') dayhr from flist")                                  // .show()

for(row1<-df2.select("dayhr").distinct().sort().rdd.collect){
  val d = row1(0)
  println(s"files with moddate:${d}")
  for (row2<-df2.select("fpath").where(df2("dayhr")===d).rdd.collect) {
    var f  = row2(0)
// ERROR OCCURS HERE:
    val df3 = spark.read.text(f)
    val cnt = df3.count()
    println(s"fpath:${f}:cnt:${cnt}")
  }
}


最后一节的输出(“转换原始数据...”)来自 spark-shell(运行 spark2.4)

scala> // =======================
scala> // transform raw data df1 to df2 (same data, but modtime epoch changed to 'yyyymmdd-hh')
scala> // =======================
scala> // nested loop
scala> // first: unique "dayhr" (yyyymmdd-hh)
scala> // second: files for each dayhr
scala> // =======================
scala> 
scala> val df2 = spark.sql("select fpath,date_format(from_unixtime(fmodtime),'yyyyMMdd-hh') dayhr from flist")                                  
23/03/22 09:36:28 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
Hive Session ID = 2132ac31-496f-47c9-baec-e201b5621650
df2: org.apache.spark.sql.DataFrame = [fpath: string, dayhr: string]

scala> df2.show()
+--------------------+-----------+                                              
|               fpath|      dayhr|
+--------------------+-----------+
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
+--------------------+-----------+
only showing top 20 rows


scala> 

scala> for(row1<-df2.select("dayhr").distinct().sort().rdd.collect){
     |   val d = row1(0)
     |   println(s"files with moddate:${d}")
     |   for (row2<-df2.select("fpath").where(df2("dayhr")===d).rdd.collect) {
     |     var f  = row2(0)
     |     val df3 = spark.read.text(f)
     |     val cnt = df3.count()
     |     println(s"fpath:${f}:cnt:${cnt}")
     |   }
     | }



<console>:38: error: overloaded method value text with alternatives:
  (paths: String*)org.apache.spark.sql.DataFrame <and>
  (path: String)org.apache.spark.sql.DataFrame
 cannot be applied to (Any)
           val df3 = spark.read.text(f)
                                ^



apache-spark-sql
© www.soinside.com 2019 - 2024. All rights reserved.