Flink 是按值还是按引用处理状态?

问题描述 投票:0回答:1

到目前为止我发现的所有示例都类似地处理 Flink

ValueState

  private ValueState<Long> myState;

  @Override
  public void open(Configuration parameters) {
    myState = getRuntimeContext().getState(new ValueStateDescriptor<>(
        "myState", Types.LONG));
  }

  @Override
  public void processElement(Integer value, Context ctx, Collector<Integer> out) {
    // Read the state
    Long stateValue = myState.value();
    if (stateValue == null) {
      // If first time, init to something
      stateValue = 0L;
    }

    // some logic
    
    // Write back to state
    myState.update(stateValue);
    out.collect(value);
  }

我不清楚

value()
update()
中发生了什么 - 那里是否发生序列化/反序列化,即是否存储了
stateValue
的副本或引用?

与 POJO 而不是

Long
相比,差异变得更加相关:

  private static class MyPojo {
    public String name;
    public long birthday;
  }

  private ValueState<MyPojo> myState;

  @Override
  public void open(Configuration parameters) {
    myState = getRuntimeContext().getState(new ValueStateDescriptor<>(
        "myState", Types.POJO(MyPojo.class)));
  }

  @Override
  public void processElement(Integer value, Context ctx, Collector<Integer> out) {
    MyPojo stateValue = myState.value();
    if (stateValue == null) {
      stateValue = new MyPojo();
      myState.update(stateValue);
    }

    // some logic

    stateValue.name = String.valueOf(value);
    stateValue.birthday = System.currentTimeMillis();
    // do I want to update() here, or is state being modified by-ref?

    out.collect(value);
  }

在修改我脱离状态的对象后是否需要

update()
,或者我对对象所做的更改最终会被 Flink 持久化吗?

java apache-flink
1个回答
0
投票

value()
正在获取当前键的状态副本,在 RocksDB 状态后端的情况下,正在发生反序列化。仅当您调用
update()
时才会修改状态。

© www.soinside.com 2019 - 2024. All rights reserved.