无法设计拆分数据框的解决方案[重复]

问题描述 投票:0回答:1

这个问题在这里已有答案:

本地目录包含由另一个应用程序生成的1000个日志文件(每天运行一次)。使用Scala我可以选择最新的文件(在同一天生成的文件)并将它们移动到HDFS。为此,我提出了以下代码:

    val spark = SparkSession.builder.master("yarn").appName("AutoCheck").enableHiveSupport().getOrCreate()
import spark.implicits._

val t = (x:Long) => { new SimpleDateFormat("yyyy-MM-dd").format(x)}
def getFileTree(f: File): Stream[File] =
  f #:: (if (f.isDirectory) f.listFiles().toStream.flatMap(getFileTree)
  else Stream.empty)

val simpDate = new java.text.SimpleDateFormat("yyyy-MM-dd")
val currDate = simpDate.format(new java.util.Date())
val now = Instant.now                                                                           // Gets current date in the format: 2017-12-13T09:40:29.920Z
val today = now.toEpochMilli
val yesterday = now.minus(Duration.ofDays(1))
val yesterdayMilliSec = yesterday.toEpochMilli
val todaySimpDate = t(today)
val yesterdaySimpDate = t(yesterdayMilliSec)
val local:String = "file://"
val folders = getFileTree(new File("/tmp/hive_audits/")).filterNot(_.getName.endsWith(".log"))  // Gets the date of folder
val folderCrtDateDesc = folders.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFolder = folderCrtDateDesc.map(y=>(y._1,t(y._2)))
val folderToday = latestFolder.filter(y => y._2==todaySimpDate)
val localFiles = getFileTree(new File("/tmp/hive_audits/")).filter(_.getName.endsWith(".log"))
val fileCrtDateDesc = localFiles.toList.map(y => (y,y.lastModified)).sortBy(-_._2)
val latestFiles = fileCrtDateDesc.toList.map(y => (y._1,t(y._2)))
val filesToday = latestFiles.filter(y => y._2==todaySimpDate)

val localFileNames = filesToday.map(y => local+y._1)
val fileName = localFileNames(2).split("/")(6)
val hadoopConf = new Configuration()
val hdfs = FileSystem.get(hadoopConf)
val localPath = new Path(localFileNames(2))
val hdfsPath = new Path(s"hdfs://devusr/user/etllogs/dailylogs/${fileName}")
hdfs.copyFromLocalFile(localPath,hdfsPath)
val fileDF = spark.read.text("/user/fdlhdpetl/dailylogs")

使用上面的代码,我可以将所有文件从本地复制到HDFS。目录中的每个文件都包含一个状态消息,有三种类型的状态:“错误”,“失败”,“成功”。我需要打开每个日志文件并阅读其中的内容并采取进一步措施。在将文件加载到HDFS之后,我在spark中加载了目录:“user / etllogs / dailylogs”,使其成为包含所有文件的单个数据帧。三种状态文件中的数据如下所示:

JobID: 454
[Wed Dec 27 05:38:47 UTC 2017] INFO: Starting Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] SEVERE: Error while compiling statement: FAILED: SemanticException [Error 10004]: Line 1:261 Invalid table alias or column
[Wed Dec 27 05:38:49 UTC 2017] INFO:
Completed Auditing for : baseTable1
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 455
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable2
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 16
[Wed Dec 27 05:38:20 UTC 2017] INFO: Success
Completed Auditing for : baseTable2
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...
JobID: 547
[Wed Dec 27 05:38:18 UTC 2017] INFO: Starting Auditing for : baseTable3
[Wed Dec 27 05:38:19 UTC 2017] INFO: Connections established to gp and finance ...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Starting the auditing for the intial fetched set of records...
[Wed Dec 27 05:38:20 UTC 2017] INFO: Number of pk columns in the src table: 16. Number of PK Columns in the dest table: 5
[Wed Dec 27 05:38:20 UTC 2017] INFO: Failed. Invalid data found.
Completed Auditing for : baseTable3
[Wed Dec 27 05:38:49 UTC 2017] INFO: Updating the job keeper...

数据框中的每个文件都以行:JobID开头,以以下结尾:INFO:更新作业管理员...

我有一个数据帧(fileDF)中所有文件的数据。现在我无法想出如何从中读取数据的解决方案。

有没有办法可以将数据框分成三个不同的数据框(一个用于'错误','成功','失败'或者有没有更好的方法可以为每个状态创建三个不同的数据帧而不将它们全部加载到单个Dataframe。另一种情况是使用Scala读取文件,然后准备具有相应内容的文件,然后将它们加载到Spark中以采取进一步的操作。

谁能让我知道处理这种情况的有效方法。

scala apache-spark
1个回答
1
投票

一旦你创建了一个JobID(在你的情况下就是fileDF)你就很难通过查看像INFO: Updating the job keeper...dataframeSuccess这样的字符串来将所有日志行分别从Failed分开,因为你的日志文件的每一行都是在Error处理为每个rows

我建议你使用dataframesparkContext api并将文件读作wholeTextFiles并操纵rdd,以便从rddsJobID的每个日志块将缩减为一行

INFO: Updating the job keeper...

下一步将使用val rdd = sparkContext.wholeTextFiles("/user/fdlhdpetl/dailylogs") .flatMap(x => x._2.replace("\n", "#[#").split("JobID:")) 将线分成三个rdds

filter

然后你必须为生成的rdd.filter(x => x.contains("Success")) rdd.filter(x => x.contains("Failed.")) rdd.filter(x => x.contains("Error")) 创建schema并将dataframe转换为rdd并使用RDD[Row]创建三个schema

dataframes

因此你有三个val simpleSchema = StructType(Seq(StructField("column", StringType))) val successDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Success")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema) val failedDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Failed.")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema) val errorDF = sqlContext.createDataFrame(rdd.filter(x => x.contains("Error")).map(x => Row.fromSeq(Seq("JobID:"+x.replace("...#[#", "...").replace("#[#", "\n")))), simpleSchema) 。我希望答案是有帮助的

© www.soinside.com 2019 - 2024. All rights reserved.