假设我们有一个 DataStream,并且可以将 MapState 附加到每个 String 元素,同时将其传递到下游。喜欢:
ds.keyBy(s -> s.hashCode() % 10)
.process(new KeyedProcessFuncion<Integer, String, Tuple2<String, MapState<String,String>>>(){
transient MapState<String,String> map;
public void open(Configuration cfg) {
map = this.getRuntimeContext().getMapState(new MapStateDescription<>("demo", String.class, String.class));
}
public void processElement(String in, Context ctx, Collector<Tuple2<String, MapState<String,String>>> out) throws Exception {
if(map.get("key") == null){
map.put("key", "val");
}
out.collect(new Tuple2<>(in, map));
}
})
.keyBy(t -> t.f0.hashCode() % 10)
.process(/* the demo MapState can be accessed here*/)
上面的代码可以工作,但问题是:访问上游传递给下游的MapState是否安全?
不,因为地图(如果您使用的是 RocksDB)不是状态的内存表示。另外,您实际上在
MapState
中存储了什么?请注意,当您的 KeyedProcessFunction
被调用时,状态已经设置为当前键,因此您不需要将键存储在映射中。