Apache Flink 抛出“分区已完成”异常

问题描述 投票:0回答:1

我们在 Kubernetes 中运行 Apache Flink 1.9。我们有一些工作消耗 Kafka 事件并每分钟收集计数。本来工作还不错,但是最近突然出现很多错误

java.lang.RuntimeException: Partition already finished.
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

引发错误的代码来自获取事件并发出水印的监听器。

    // We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    protected var context: SourceFunction.SourceContext[T] = ...

    validEventsSorted.foreach { event =>
      try {
        context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
        context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
      } catch {
        case e: Throwable =>
          logger.error(
              s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
                s" Event: $event",
              e
            )
      }

    }

重新启动 Flink 作业管理器和任务管理器将结束错误,但此问题稍后可能会再次出现。

据我了解和猜测,

Partition already finished
是当操作员尝试将事件传递给下一个操作员(分区)时引起的,但我不明白这是如何发生的。

这是我们在 Source 上的代码

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

class SourceImpl[T: ClassTag](
    listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {

  @volatile private var isCancelled: Boolean = false

  @volatile private var consumerFuture: java.util.concurrent.Future[_] = _

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    
    while (!isCancelled) {
        val runnable = KafkaClient
          .stream(subscription)
          .withStreamParameters(streamParameters)
          .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)

        val executorService = Executors.newSingleThreadExecutor()
        consumerFuture = executorService.submit(runnable)
        consumerFuture.get() // This is blocking
      } catch {
        case e: Throwable =>
          logger.warn(s"Unknown error consuming events", e)
      }
    }
  }

  override def cancel(): Unit = {
    isCancelled = true
    consumerFuture.cancel(true)
  }
}

有人知道为什么以及如何解决这个问题吗?

exception kubernetes apache-flink partition
1个回答
1
投票

原来我们的

SourceImpl
有一个bug。当 JobManager 取消此作业时,会调用
cancel
方法,但可能会失败,并且
executorService
不会关闭,并且
runnable
仍在 TaskManager 中运行,消耗事件并发出 WaterMark。由于作业已在 JobManager 和 TaskManager 中标记为已取消,因此水印发射将导致
Partition already finished
异常。

因此,我们修复了显式关闭

ExecutoreService
的问题

    // Shutdown executorService
    if (executorService != null && !executorService.isShutdown) {
      executorService.shutdownNow()
    }

完整代码如下

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

class SourceImpl[T: ClassTag](
    listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {

  @volatile private var isCancelled: Boolean = false

  @volatile private var consumerFuture: java.util.concurrent.Future[_] = _

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    
    val executorService = Executors.newSingleThreadExecutor()
  
    while (!isCancelled) {
        val runnable = KafkaClient
          .stream(subscription)
          .withStreamParameters(streamParameters)
          .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)

        consumerFuture = executorService.submit(runnable)
        consumerFuture.get() // This is blocking
      } catch {
        case e: Throwable =>
          logger.warn(s"Unknown error consuming events", e)
      }
    }

    // Shutdown executorService
    if (executorService != null && !executorService.isShutdown) {
      executorService.shutdownNow()
    }
  }

  override def cancel(): Unit = {
    isCancelled = true
    consumerFuture.cancel(true)
  }
}

顺便说一句,我们之所以有一个新的

ExecutorService
,是为了在单独的线程池中运行监听器,这不会影响Flink线程池。

© www.soinside.com 2019 - 2024. All rights reserved.