Flink1.5.4例外:数据流损坏,找到的标记:105

问题描述 投票:0回答:1

我的程序想在没有Flink窗口的情况下加入两个流。

我连接两个流,并定义一个类A扩展RichCoFlatMapFunction来处理它们。在A类中,我使用Guava缓存来保存来自flatmap1 / 2方法的所有数据,并通过流中的标记将它们连接在一起。然后,Guava缓存具有一个删除侦听器,以将加入和过期的数据收集到下一个Flink函数。

private synchronized void collect(ReqFeatures features) {
    feaCollector.collect(features);
}

每次开始时,它运行良好,但是几个小时后,由于这个异常,它总是死了。

java.io.IOException: Corrupt stream, found tag: 105
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:220)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:49)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:172)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

有时还会出现另一个错误日志:

java.lang.IllegalStateException: When there are multiple buffers, an unfinished bufferConsumer can not be at the head of the buffers queue.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.pollBuffer(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.getNextBuffer(PipelinedSubpartitionView.java:51)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextBuffer(LocalInputChannel.java:186)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:551)
at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:508)
at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)

如果我改用Flink Window Function,则不会发生此异常。为什么会发生此异常,我该如何解决?

apache-flink flink-streaming
1个回答
0
投票

我可以确认这也在Flink 1.9.1中发生(尽管对我们来说,这在我们运行flink stop <job-id>时发生)

© www.soinside.com 2019 - 2024. All rights reserved.