所有,scala 菜鸟问题: 遇到将字符串数据类型从数据帧转换为使用参数到 spark.read.text() 函数的问题
以下代码将文件目录读入数据框 df1,映射到临时表 flist df1 仅包含 fpath、modtime 纪元 df2 使用 spark.sql 将 modtime 纪元转换为日期-小时字符串 最后一部分是嵌套循环,外循环读取唯一的日期-小时字符串,查询 df2 以获取给定日期-小时字符串的文件名
但是,尝试使用 fpath 作为 spark.read.text(f) 的参数,由于 fpath 的数据类型为“Any”而失败,但需要字符串 (??)
问题是:我不确定如何让它接受 fpath (f) 参数 将不胜感激任何意见-非常感谢您花时间在这上面 先生
以下代码和错误:
// =======================
// get list of files in directory
// =======================
import org.apache.hadoop.fs.{FileSystem, Path}
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val srcpath=new Path("/data/gen1")
val files = fs.listStatus(srcpath)
// =======================
// Convert List of files to DataFrame
// =======================
// map files (array of hdfsfiles) to List
// convert List to RDD
// create schema (NB other syntaxes for this possible)
// convert RDD to DataFrame
// =======================
// list of unique yyyymmdd-hh24
// =======================
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rows = files.map(t => Row(t.getPath().toString,(t.getModificationTime/1000).toLong)).toList
val rdd = spark.sparkContext.parallelize(rows)
val schema = StructType( StructField("FPath", StringType, true)::StructField("FModTime", LongType, true)::Nil)
val df1 = spark.createDataFrame(rdd, schema)
print(df1.schema)
df1.createOrReplaceTempView("flist")
// =======================
// transform raw data df1 to df2 (same data, but modtime epoch changed to 'yyyymmdd-hh')
// =======================
// nested loop
// first: unique "dayhr" (yyyymmdd-hh)
// second: files for each dayhr
// =======================
val df2 = spark.sql("select fpath,date_format(from_unixtime(fmodtime),'yyyyMMdd-hh') dayhr from flist") // .show()
for(row1<-df2.select("dayhr").distinct().sort().rdd.collect){
val d = row1(0)
println(s"files with moddate:${d}")
for (row2<-df2.select("fpath").where(df2("dayhr")===d).rdd.collect) {
var f = row2(0)
// ERROR OCCURS HERE:
val df3 = spark.read.text(f)
val cnt = df3.count()
println(s"fpath:${f}:cnt:${cnt}")
}
}
最后一节的输出(“转换原始数据...”)来自 spark-shell(运行 spark2.4)
scala> // =======================
scala> // transform raw data df1 to df2 (same data, but modtime epoch changed to 'yyyymmdd-hh')
scala> // =======================
scala> // nested loop
scala> // first: unique "dayhr" (yyyymmdd-hh)
scala> // second: files for each dayhr
scala> // =======================
scala>
scala> val df2 = spark.sql("select fpath,date_format(from_unixtime(fmodtime),'yyyyMMdd-hh') dayhr from flist")
23/03/22 09:36:28 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist
Hive Session ID = 2132ac31-496f-47c9-baec-e201b5621650
df2: org.apache.spark.sql.DataFrame = [fpath: string, dayhr: string]
scala> df2.show()
+--------------------+-----------+
| fpath| dayhr|
+--------------------+-----------+
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
|hdfs://ccycloud.m...|20230320-06|
+--------------------+-----------+
only showing top 20 rows
scala>
scala> for(row1<-df2.select("dayhr").distinct().sort().rdd.collect){
| val d = row1(0)
| println(s"files with moddate:${d}")
| for (row2<-df2.select("fpath").where(df2("dayhr")===d).rdd.collect) {
| var f = row2(0)
| val df3 = spark.read.text(f)
| val cnt = df3.count()
| println(s"fpath:${f}:cnt:${cnt}")
| }
| }
<console>:38: error: overloaded method value text with alternatives:
(paths: String*)org.apache.spark.sql.DataFrame <and>
(path: String)org.apache.spark.sql.DataFrame
cannot be applied to (Any)
val df3 = spark.read.text(f)
^