如何自动更新zeppelin中的%spark.sql结果以进行结构化流式查询

问题描述 投票:4回答:1

我正在(带有zeppelin 0.7的spark 2.1.0中运行结构化流,用于来自kafka的数据,并且我试图通过spark.sql可视化流结果)>

如下:

%spark2
val spark = SparkSession
  .builder()
  .appName("Spark structured streaming Kafka example")
  .master("yarn")
  .getOrCreate()
val inputstream = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "n11.hdp.com:6667,n12.hdp.com:6667,n13.hdp.com:6667 ,n10.hdp.com:6667, n9.hdp.com:6667")
    .option("subscribe", "st")
    .load()


val stream = inputstream.selectExpr("CAST( value AS STRING)").as[(String)].select(
             expr("(split(value, ','))[0]").cast("string").as("pre_post_paid"),
             expr("(split(value, ','))[1]").cast("double").as("DataUpload"),
             expr("(split(value, ','))[2]").cast("double").as("DataDowndownload"))
           .filter("DataUpload is not null and DataDowndownload is not null")
          .groupBy("pre_post_paid").agg(sum("DataUpload") + sum("DataDowndownload") as "size")
val query = stream.writeStream
.format("memory")
.outputMode("complete")
.queryName("test")
.start()

运行后,我对“测试”进行如下查询:

%sql
select *
from test

仅当我手动运行它时才更新,我的问题是如以下示例所示,如何在处理新数据(流化可视化)时使其更新:

Insights Without Tradeoffs: Using Structured Streaming in Apache Spark

我正在(来自zeppelin 0.7的spark 2.1.0中运行结构化流,用于来自kafka的数据,我正尝试通过spark.sql可视化流的结果,如下所示:%spark2 val spark = ...

apache-spark-sql spark-streaming visualization apache-zeppelin
1个回答
0
投票

替换行

© www.soinside.com 2019 - 2024. All rights reserved.