我正在(带有zeppelin 0.7的spark 2.1.0中运行结构化流,用于来自kafka的数据,并且我试图通过spark.sql可视化流结果)>
如下:
%spark2 val spark = SparkSession .builder() .appName("Spark structured streaming Kafka example") .master("yarn") .getOrCreate() val inputstream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "n11.hdp.com:6667,n12.hdp.com:6667,n13.hdp.com:6667 ,n10.hdp.com:6667, n9.hdp.com:6667") .option("subscribe", "st") .load() val stream = inputstream.selectExpr("CAST( value AS STRING)").as[(String)].select( expr("(split(value, ','))[0]").cast("string").as("pre_post_paid"), expr("(split(value, ','))[1]").cast("double").as("DataUpload"), expr("(split(value, ','))[2]").cast("double").as("DataDowndownload")) .filter("DataUpload is not null and DataDowndownload is not null") .groupBy("pre_post_paid").agg(sum("DataUpload") + sum("DataDowndownload") as "size") val query = stream.writeStream .format("memory") .outputMode("complete") .queryName("test") .start()
运行后,我对“测试”进行如下查询:
%sql select * from test
仅当我手动运行它时才更新,我的问题是如以下示例所示,如何在处理新数据(流化可视化)时使其更新:
Insights Without Tradeoffs: Using Structured Streaming in Apache Spark
我正在(来自zeppelin 0.7的spark 2.1.0中运行结构化流,用于来自kafka的数据,我正尝试通过spark.sql可视化流的结果,如下所示:%spark2 val spark = ...
替换行