为什么 Flink ValueState.value() 有时会错误返回 null?

问题描述 投票:0回答:1

我在 Flink 应用程序中遇到错误,在

myValueState.value()
内调用
KeyedProcessFunction
有时会返回
null
,尽管代码中的逻辑应保证
.value()
返回的对象不为空。这些空值很少返回,并且当应用程序重新启动并在之前失败的相同数据上运行时,不会再次出现。注意:
myValueState
属于类型
ValueState<java.time.LocalDateTime>

更多背景

  • 我使用的是 Flink 1.15.2,托管在 AWS Kinesis Data Analytics 上;这就是错误发生的地方
  • 本地没有发生错误
  • RocksDB 用作 AWS Kinesis Data Analytics 上的状态存储后端
  • 我正在使用带有 java 11 的 DataStream API

代码

  • 在流程函数的顶部附近,我运行
    updateMinTimestamp
  • 在我看来,这个函数应该确保这个值状态的值永远不应该为空
  • 稍后在代码中我在
    minTimestamp.value()
    函数中调用
    getLocalDateTimeValueState
    ,该函数偶尔会返回 null

    import java.io.IOException;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
        private transient ObjectMapper objectMapper;
        private transient ValueState<LocalDateTime> minTimestamp;
    
        @Override
        public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
            Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);
    
            if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
                SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
                System.out.printf(
                        "Deserialized event category '%s' for txnId '%s' with timestamp '%s'\n",
                        event.getCategory(), event.getTxnId(), event.getTimestamp()
                );
    
                updateMinTimestamp(event.getTimestamp());
    
                // some other stuff (processing + aggregating event, unrelated to the minTimestamp...
                //....
    
                // this value is sometimes null, which triggers a NPE when calling `toString` on it
                // based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
                LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);
    
                // sometimes throws NPE
                String minTimestampStr = minTimestampValue.toString();
    
                // some more stuff, include ctx.out(...)
                //....
            }
        }
    
        @Override
        public void open(Configuration configuration) {
            objectMapper = new ObjectMapper();
            minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
        }
    
        private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
            ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
                    name,
                    new LocalDateTimeSerializer()
            );
            eventTimestampDescriptor.enableTimeToLive(
                    StateTtlConfig
                            .newBuilder(Time.hours(ttl))
                            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                            .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                            .build()
            );
            return eventTimestampDescriptor;
        }
    
        private Event deserializeBytesToEvent(byte[] serializedEvent) {
            SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
            try {
                final JsonNode node = objectMapper.readTree(serializedEvent);
                event.setCategory(node.get("category").asLong());
                event.setTxnId(node.get("txnId").asText());
                event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
                event.setPayload(objectMapper.readTree(node.get("payload").asText()));
    
                return event;
            } catch (IOException e) {
                System.out.printf(
                        "Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'\n",
                        event.getCategory(),
                        event.getTxnId(),
                        event.getTimestamp(),
                        event.getPayload()
                );
                return new UnsuccessfullyDeserializedEvent();
            }
        }
    
        void updateMinTimestamp(LocalDateTime newTimestamp) {
            try {
                final LocalDateTime currentMinTimestamp = minTimestamp.value();
                if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
                    minTimestamp.update(newTimestamp);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
            try {
                return localDateTimeValueState.value();
            } catch (IOException e) {
                throw new RuntimeException("Error grabbing localdatetime from value state");
            }
        }
    
        public interface Event {}
    
    
        public class SuccessfullyDeserializedEvent implements Event {
            private Long category;
            private JsonNode payload;
            private String txnId;
            private LocalDateTime timestamp;
    
            SuccessfullyDeserializedEvent() {}
    
            // getters
            Long getCategory() {
                return this.category;
            }
            JsonNode getPayload() {
                return this.payload;
            }
            String getTxnId() {
                return this.txnId;
            }
            LocalDateTime getTimestamp() {
                return this.timestamp;
            }
            // setters
            void setCategory(Long category) {
                this.category = category;
            }
            void setPayload(JsonNode payload) {
                this.payload = payload;
            }
            void setTxnId(String txnId) {
                this.txnId = txnId;
            }
            void setTimestamp(LocalDateTime timestamp) {
                this.timestamp = timestamp;
            }
        }
    
        public class UnsuccessfullyDeserializedEvent implements Event {
        }
    }


非常感谢有关为什么会发生此错误以及如何防止它的任何信息

apache-flink flink-streaming amazon-kinesis-analytics flink-state
1个回答
0
投票

回答我自己的问题:我相信该错误是由我的不良编码实践和 Flink 状态到期 TTL 机制导致的奇怪行为共同引起的:

  1. 在我的代码中,我是:抓取状态,更新状态,然后在同一个
    processElement
    调用中抓取更新后的状态;这是低效的,你需要做的就是获取状态并更新它;更新后应该不用再抢了
  2. Flink TTL 的行为似乎是,如果状态正在被擦除,更新它然后检索它可能仍然会给你一个 null (即使更新类型是
    OnReadAndWrite
    );从我深入研究这个问题到现在有一个时间间隔,但我可以有把握地说 Flink 的 TTL 机制有很多意想不到的行为

一般来说,处理 Flink 状态的安全模式是:获取有状态流程函数开头附近的所有状态,检查其值,然后进行相应操作(即,如果所有状态都存在,则执行 x,如果某些状态过期,则执行 y但其他状态则不然,依此类推)。如果你的 Flink 状态有一些 TTL,不小心处理它可能会产生竞争条件(你的代码会获取状态,还是会意外过期?)。

© www.soinside.com 2019 - 2024. All rights reserved.