无法使用spark kafka在特定列上分组

问题描述 投票:0回答:1

我在创建的一个名为“ songTopic”的主题中有以下数据,

sid,Believer   
ruc,Thunder

第一个参数是username,第二个参数是用户经常收听的song name。现在,我以如上所述的主题名称开始了zookeeperKafka serverProducer。我已经使用CMD输入了该主题的上述数据。我的最终目标是将架构添加到该数据集并执行count聚合,并将其写回到Kafka的主题。我出于相同的目的开发了以下代码:

package com.sparkKafka
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
object SparkKafkaTopic2 {
  def main(ar: Array[String]) {
    val spark = SparkSession.builder().appName("SparkKafka").master("local[*]").getOrCreate()

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "songTopic").option("startingOffsets", "earliest")
      .load()

    val newDf = df.select(split(col("value"), ",")(0).as("userName"), split(col("value"), ",")(1).as("songName"), col("timestamp"))
    val windowedCount = newDf
      .withWatermark("timestamp", "20000 milliseconds")
      .groupBy(
        window(col("songName"), "timestamp", "10 seconds"))
      .agg(count(col("songName")))
    val query =
      windowedCount.writeStream
        .format("console")
        .option("truncate", "false")
        .outputMode("append").start().awaitTermination()
  }
}

[我必须在每个20secs接受数据,执行aggregation,然后将其写回到Kafka主题。上面的代码抛出以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [window#32], [window#32 AS window#26, count(songName#22) AS count(songName)#31L]
+- Filter isnotnull(cast(songName#22 as timestamp))  

我需要输出如下内容:enter image description here我什至尝试使用输出模式的CompleteUpdate值,但没有用。不知道这里到底发生了什么。提到了几个SO职位,但没有运气。当我是Kafka集成的新手,并尽我最大的努力学习时,请指导我。

scala apache-spark hadoop apache-kafka streaming
1个回答
0
投票
从手册:
© www.soinside.com 2019 - 2024. All rights reserved.