我在创建的一个名为“ songTopic
”的主题中有以下数据,
sid,Believer
ruc,Thunder
第一个参数是username
,第二个参数是用户经常收听的song name
。现在,我以如上所述的主题名称开始了zookeeper
,Kafka server
和Producer
。我已经使用CMD
输入了该主题的上述数据。我的最终目标是将架构添加到该数据集并执行count
聚合,并将其写回到Kafka的主题。我出于相同的目的开发了以下代码:
package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object SparkKafkaTopic2 {
def main(ar: Array[String]) {
val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "songTopic").option("startingOffsets", "earliest")
.load()
val newDf = df.select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
val windowedCount = newDf
.withWatermark("timestamp", "20000 milliseconds")
.groupBy(
window(col("songName"), "timestamp", "10 seconds"))
.agg(count(col("songName")))
val query =
windowedCount.writeStream
.format("console")
.option("truncate", "false")
.outputMode("append").start().awaitTermination()
}
}
[我必须在每个20secs
接受数据,执行aggregation
,然后将其写回到Kafka主题。上面的代码抛出以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [window#32], [window#32 AS window#26, count(songName#22) AS count(songName)#31L]
+- Filter isnotnull(cast(songName#22 as timestamp))
我需要输出如下内容:我什至尝试使用输出模式的Complete
和Update
值,但没有用。不知道这里到底发生了什么。提到了几个SO职位,但没有运气。当我是Kafka集成的新手,并尽我最大的努力学习时,请指导我。