我每天收到一个XML或CSV文件,其中包含所有数据(旧数据和新数据)。例如,如果Yesterday.xml包含3条记录,则Today.xml包含4条记录(3条旧记录和1条新记录)。
我只关心最后一个(新行),因为我只想处理新数据,旧数据每天都在处理。
使用Spark和Kafka实现此目的的最佳方法是什么?数据示例:
OpportunityNo, OpprotunityTitle,Field
--- yesterday data----
Row1:1,OppTit1,IT
Row2:2,OppTit2,HEALTH
Row3:3,OppTit3,Finance
-------today data---------
Row4:4,OppTit4,Engineering
附加说明:
我的代码在下面,但是它获取每个新文件上的所有数据到目录,我只想获取新数据而不是所有数据。
import org.apache.log4j._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{ StructType, StructField, IntegerType, DoubleType, StringType, TimestampType, DateType }
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger
object Demo {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
conf.set("spark.app.name", "GrantAnalytics")
conf.set("spark.master", "local")
val sc = new SparkContext(conf)
val spark = SparkSession.builder().appName("GrantAnalytics").master("local[*]").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 5)
val schema = new StructType(Array(
new StructField("OpportunityID", IntegerType, true),
new StructField("OpportunityTitle", StringType, true),
new StructField("OpportunityNumber", StringType, true),
new StructField("CFDANumbers", DoubleType, true),
new StructField("CategoryOfFundingActivity", StringType, true)))
val streamingDF = spark.readStream.schema(schema).option("maxFilesPerTrigger", 1).option("header", "true").format("csv").load("C:/datasets/output/*.csv")
val query = streamingDF.select(concat(col("OpportunityID"), lit("~"), col("OpportunityTitle"), lit("~"), col("OpportunityNumber"), lit("~"), col("CFDANumbers"), lit("~"), col("CategoryOfFundingActivity")).alias("value")).writeStream.format("kafka").outputMode(OutputMode.Update()).option("kafka.bootstrap.servers", "localhost:9092").option("topic", "grants").option("checkpointLocation", "C:/deleteme/kafka/").start()
query.awaitTermination()
}
}
如果您想使用Kafka,最好在设计应用程序时考虑单个事件和独立事件。
这意味着,您应该将XML拆分为行,并按数据/时间戳对其进行过滤,以将每条消息发送给Kafka。在Kafka中,每条消息在示例数据中仅包含一行。最后,在第二天之后,Kafka应该包含以下四个消息:
1,OppTit1,IT
2,OppTit2,HEALTH
3,OppTit3,Finance
4,OppTit4,Engineering
此外,请确保为Kafka消息应用有用的密钥。
一旦Kafka中的数据作为单个事件可用,您就可以使用Spark消耗和处理仅引用当天的新消息,因为先前使用的消息将不再被消耗。