Flink 作业不断部署或初始化

问题描述 投票:0回答:1

我基于flink-kubernetes-operator部署我的flink任务。同时,我设置了一个检查点,其中检查点目录是一个挂载的pvc。 StateBackend使用RocksDB并配置了增量检查点。但是我的程序会遇到一些问题

  1. 重启服务时,有些子任务会一直处于 DEPLOYING 或 INITIALIZING 状态,就像阻塞一样,不再继续运行,而另一些子任务则处于 RUNNING 状态。但有时任务可以再次删除,重新部署后又可以正常运行。
  2. 增量检查点大小感觉像是在持续增长,但是我为自定义状态设置了TTL,其他函数是ReduceFunction。我应该如何检查哪里可能存在状态泄漏?
  3. 有时候任务运行时,会出现类似阻塞的情况,不再继续消费。与此同时,检查点继续失败。有什么好的故障排除方法适合我吗?

flink版本:1.14.4

flink-kubernetes-operator 版本:release-1.4.0,链接:https://github.com/apache/flink-kubernetes-operator commit:7fc23a1;

我不知道如何调试。当我的作业阻塞时,我尝试查看线程堆栈,你会发现有线程阻塞;但每次阻塞的结果是不同的。有时会像

[arthas@1]$ thread -b 
"Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) -> Flat Map (22/72)#0" Id=157 BLOCKED on java.util.jar.JarFile@449cf4a0 owned by "Window(TumblingEventTimeWindows(60000), EventTimeTrigger, AppGroupTransactionEventApplyFunction) -> (Sink: Unnamed, Timestamps/Watermarks -> Flat Map) (31/32)#0" Id=151
    at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:719)
    -  blocked on java.util.jar.JarFile@449cf4a0
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.fill(ZipFile.java:434)
    at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
    at sun.misc.Resource.getBytes(Resource.java:124)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:463)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    -  locked org.apache.flink.util.ChildFirstClassLoader@697c9014 <---- but blocks 93 other threads!
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at java.util.ArrayList.readObject(ArrayList.java:797)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:485)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1612)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1596)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:376)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:359)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:332)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:324)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:314)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:75)
    at sun.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1582)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:740)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.lang.Thread.run(Thread.java:748)

但有时是其他的:

"System Time Trigger for Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) (32/48)#0" Id=232 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@783b47ad
    at sun.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@783b47ad
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ...


"KeyedProcess -> Sink: CCLOUD_APP_SPAN_PORTRAIT_DETECT (24/32)#0" Id=134 BLOCKED on org.apache.flink.util.ChildFirstClassLoader@53624f6b owned by "Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) (32/48)#0" Id=92
    at java.lang.Class.getDeclaredFields0(Native Method)
    -  blocked on org.apache.flink.util.ChildFirstClassLoader@53624f6b
    at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
    at java.lang.Class.getDeclaredField(Class.java:2068)
    at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857)
    at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    at java.security.AccessController.doPrivileged(Native Method)
java apache-flink flink-streaming rocksdb checkpoint
1个回答
0
投票

对于第一个问题: 如果你声明非常大,比你管理的内存大小大得多,可能远大于 4GB。当您从 savepoint/checkpoint 重新启动作业时,Flink 将消耗大量磁盘 I/O。

如果您发现磁盘 IOPS 达到限制,您可以尝试升级磁盘暂时可以解决此问题,但最终您需要降低状态大小。

到最后一个日志: 如果您发现 TM 线程转储充满了 AbstractQueuedSynchronizer 的许多 TIMED_WAITING,则可能遇到此错误:https://issues.apache.org/jira/browse/FLINK-12852

© www.soinside.com 2019 - 2024. All rights reserved.