在Java上火花流数据帧和累加器

问题描述 投票:0回答:1

我正在Spark结构化流中处理kafka JSON流。作为微批次处理,我可以对流数据帧使用累加器吗?

LongAccumulator longAccum = new LongAccumulator("my accum");

Dataset<Row> df2 = df.filter(output.col("Called number").equalTo("0860"))
            .groupBy("Calling number").count();
// put row counter to accumulator for example
df2.javaRDD().foreach(row -> {longAccumulator.add(1);})

投掷

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

。我也很困惑这样使用累加器。将数据帧向下转换为RDD看起来很奇怪,而且不必要。如果没有RDD和foreach(),我可以这样做吗?

java apache-spark streaming accumulator
1个回答
0
投票

否,您可以直接使用下面的数据集进行访问-

 LongAccumulator longAccum = spark.sparkContext().longAccumulator("my accum");


        Dataset<Row> df = spark.range(100).withColumn("x", lit("x"));

        //access in map
        df.map((MapFunction<Row, Row>) row -> {
            longAccum.add(1);
            return  row;
        }, RowEncoder.apply(df.schema()))
                .count();

        // accumulator value
        System.out.println(longAccum.value()); // 100

        longAccum.reset();
        // access in for each
        df.foreach((ForeachFunction<Row>) row -> longAccum.add(1));

        // accumulator value
        System.out.println(longAccum.value()); // 100

请注意,累加器值仅在执行action时才更新。

© www.soinside.com 2019 - 2024. All rights reserved.