什么时候广播变量会发生变化?

问题描述 投票:3回答:1

我被告知广播变量应该是不可变的。

然而,我看到了一个代码片段,其中广播变量用作标志。

public class TestBroadcast {

  private static JavaStreamingContext jssc;
  private static volatile Broadcast<Boolean> done;

  public static void main(String[] args) throws InterruptedException {

    Logger.getLogger("org").setLevel(Level.ERROR);

    List<String> log = Arrays.asList("X", "X", "X");

    SparkConf sparkConf = new SparkConf().setAppName("Test").setMaster("local[2]");
    jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
    done = jssc.sparkContext().broadcast(Boolean.FALSE);    // false in the beginning

    JavaRDD<String> _rdd = jssc.sparkContext().parallelize(log);
    Queue<JavaRDD<String>> queue = new LinkedList<>();
    queue.add(_rdd);
    JavaDStream<String> lines = jssc.queueStream(queue);

    lines.foreachRDD(
        rdd -> {
          rdd.foreachPartition(x -> System.out.println(done.getValue())); // executor get false
          done = jssc.sparkContext().broadcast(Boolean.TRUE); // driver set the variable to true
/*MARK*/  rdd.foreachPartition(x -> System.out.println(done.getValue())); // executor get true
        });

    jssc.start();

    jssc.awaitTermination();

  }

}

广播变量在用/*MARK*/original source)注释的行上发生变化,为什么会发生这种情况?

apache-spark spark-streaming
1个回答
2
投票

尽管done同名,但两个广播变量是不同的。

我必须承认,我从未见过如此使用广播变量(也许是因为它导致了错误的结论,因为你的问题似乎证明了这一点)。除非我错了,因为布尔值非常小(即使没有广播变量也不会对序列化的消息有效负载增加太多),因此用法并没有真正增加太多。

更不寻常的是,这是在使用发生在驱动程序上的foreachRDD的Spark Streaming应用程序中,因此可以访问执行程序上不可用的JavaStreamingContext(会导致NullPointerException)。


事实上,广播变量的生命周期确实允许改变广播变量的值。将广播变量视为执行器的内存空间中可用的内容的句柄,可以根据需要将其解析为值。

您可以在驱动程序上使用unpersist方法(将触发“执行程序上的删除”消息发送给执行程序),因此以下value将再次获得广播值(可能与最初的更改)。

unpersist():Unit在执行程序上异步删除此广播的缓存副本。如果在调用此广播后使用广播,则需要将其重新发送给每个执行者。

对于这个特殊情况(使用foreachRDD)它没有多大意义,因为foreachRDD引入了另一个层,您可以在其中操作广播变量并提交Spark作业。

© www.soinside.com 2019 - 2024. All rights reserved.