我们在 Kubernetes 中运行 Apache Flink 1.9。我们有一些工作消耗 Kafka 事件并每分钟收集计数。本来工作还不错,但是最近突然出现很多错误
java.lang.RuntimeException: Partition already finished.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
引发错误的代码来自获取事件并发出水印的监听器。
// We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
import org.apache.flink.streaming.api.functions.source.SourceFunction
protected var context: SourceFunction.SourceContext[T] = ...
validEventsSorted.foreach { event =>
try {
context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
} catch {
case e: Throwable =>
logger.error(
s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
s" Event: $event",
e
)
}
}
重新启动 Flink 作业管理器和任务管理器将结束错误,但此问题稍后可能会再次出现。
据我了解和猜测,
Partition already finished
是当操作员尝试将事件传递给下一个操作员(分区)时引起的,但我不明白这是如何发生的。
这是我们在 Source 上的代码
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
val executorService = Executors.newSingleThreadExecutor()
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
有人知道为什么以及如何解决这个问题吗?
原来我们的
SourceImpl
有一个bug。当 JobManager 取消此作业时,会调用 cancel
方法,但可能会失败,并且 executorService
不会关闭,并且 runnable
仍在 TaskManager 中运行,消耗事件并发出 WaterMark。由于作业已在 JobManager 和 TaskManager 中标记为已取消,因此水印发射将导致 Partition already finished
异常。
因此,我们修复了显式关闭
ExecutoreService
的问题
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
完整代码如下
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
val executorService = Executors.newSingleThreadExecutor()
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
顺便说一句,我们之所以有一个新的
ExecutorService
,是为了在单独的线程池中运行监听器,这不会影响Flink线程池。