我在 Flink 应用程序中遇到错误,在
myValueState.value()
内调用 KeyedProcessFunction
有时会返回 null
,尽管代码中的逻辑应保证 .value()
返回的对象不为空。这些空值很少返回,并且当应用程序重新启动并在之前失败的相同数据上运行时,不会再次出现。注意:myValueState
属于类型 ValueState<java.time.LocalDateTime>
。
更多背景
代码
updateMinTimestamp
minTimestamp.value()
函数中调用 getLocalDateTimeValueState
,该函数偶尔会返回 null
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
private transient ObjectMapper objectMapper;
private transient ValueState<LocalDateTime> minTimestamp;
@Override
public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);
if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
System.out.printf(
"Deserialized event category '%s' for txnId '%s' with timestamp '%s'\n",
event.getCategory(), event.getTxnId(), event.getTimestamp()
);
updateMinTimestamp(event.getTimestamp());
// some other stuff (processing + aggregating event, unrelated to the minTimestamp...
//....
// this value is sometimes null, which triggers a NPE when calling `toString` on it
// based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);
// sometimes throws NPE
String minTimestampStr = minTimestampValue.toString();
// some more stuff, include ctx.out(...)
//....
}
}
@Override
public void open(Configuration configuration) {
objectMapper = new ObjectMapper();
minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
}
private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
name,
new LocalDateTimeSerializer()
);
eventTimestampDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.hours(ttl))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build()
);
return eventTimestampDescriptor;
}
private Event deserializeBytesToEvent(byte[] serializedEvent) {
SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
try {
final JsonNode node = objectMapper.readTree(serializedEvent);
event.setCategory(node.get("category").asLong());
event.setTxnId(node.get("txnId").asText());
event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
event.setPayload(objectMapper.readTree(node.get("payload").asText()));
return event;
} catch (IOException e) {
System.out.printf(
"Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'\n",
event.getCategory(),
event.getTxnId(),
event.getTimestamp(),
event.getPayload()
);
return new UnsuccessfullyDeserializedEvent();
}
}
void updateMinTimestamp(LocalDateTime newTimestamp) {
try {
final LocalDateTime currentMinTimestamp = minTimestamp.value();
if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
minTimestamp.update(newTimestamp);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
try {
return localDateTimeValueState.value();
} catch (IOException e) {
throw new RuntimeException("Error grabbing localdatetime from value state");
}
}
public interface Event {}
public class SuccessfullyDeserializedEvent implements Event {
private Long category;
private JsonNode payload;
private String txnId;
private LocalDateTime timestamp;
SuccessfullyDeserializedEvent() {}
// getters
Long getCategory() {
return this.category;
}
JsonNode getPayload() {
return this.payload;
}
String getTxnId() {
return this.txnId;
}
LocalDateTime getTimestamp() {
return this.timestamp;
}
// setters
void setCategory(Long category) {
this.category = category;
}
void setPayload(JsonNode payload) {
this.payload = payload;
}
void setTxnId(String txnId) {
this.txnId = txnId;
}
void setTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
}
}
public class UnsuccessfullyDeserializedEvent implements Event {
}
}
非常感谢有关为什么会发生此错误以及如何防止它的任何信息
回答我自己的问题:我相信该错误是由我的不良编码实践和 Flink 状态到期 TTL 机制导致的奇怪行为共同引起的:
processElement
调用中抓取更新后的状态;这是低效的,你需要做的就是获取状态并更新它;更新后应该不用再抢了OnReadAndWrite
);从我深入研究这个问题到现在有一个时间间隔,但我可以有把握地说 Flink 的 TTL 机制有很多意想不到的行为一般来说,处理 Flink 状态的安全模式是:获取有状态流程函数开头附近的所有状态,检查其值,然后进行相应操作(即,如果所有状态都存在,则执行 x,如果某些状态过期,则执行 y但其他状态则不然,依此类推)。如果你的 Flink 状态有一些 TTL,不小心处理它可能会产生竞争条件(你的代码会获取状态,还是会意外过期?)。