到目前为止我发现的所有示例都类似地处理 Flink
ValueState
;
private ValueState<Long> myState;
@Override
public void open(Configuration parameters) {
myState = getRuntimeContext().getState(new ValueStateDescriptor<>(
"myState", Types.LONG));
}
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
// Read the state
Long stateValue = myState.value();
if (stateValue == null) {
// If first time, init to something
stateValue = 0L;
}
// some logic
// Write back to state
myState.update(stateValue);
out.collect(value);
}
我不清楚
value()
和 update()
中发生了什么 - 那里是否发生序列化/反序列化,即是否存储了 stateValue
的副本或引用?
与 POJO 而不是
Long
相比,差异变得更加相关:
private static class MyPojo {
public String name;
public long birthday;
}
private ValueState<MyPojo> myState;
@Override
public void open(Configuration parameters) {
myState = getRuntimeContext().getState(new ValueStateDescriptor<>(
"myState", Types.POJO(MyPojo.class)));
}
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
MyPojo stateValue = myState.value();
if (stateValue == null) {
stateValue = new MyPojo();
myState.update(stateValue);
}
// some logic
stateValue.name = String.valueOf(value);
stateValue.birthday = System.currentTimeMillis();
// do I want to update() here, or is state being modified by-ref?
out.collect(value);
}
在修改我脱离状态的对象后是否需要
update()
,或者我对对象所做的更改最终会被 Flink 持久化吗?
value()
正在获取当前键的状态副本,在 RocksDB 状态后端的情况下,正在发生反序列化。仅当您调用 update()
时才会修改状态。