我基于flink-kubernetes-operator部署我的flink任务。同时,我设置了一个检查点,其中检查点目录是一个挂载的pvc。 StateBackend使用RocksDB并配置了增量检查点。但是我的程序会遇到一些问题
flink版本:1.14.4
flink-kubernetes-operator 版本:release-1.4.0,链接:https://github.com/apache/flink-kubernetes-operator commit:7fc23a1;
我不知道如何调试。当我的作业阻塞时,我尝试查看线程堆栈,你会发现有线程阻塞;但每次阻塞的结果是不同的。有时会像
[arthas@1]$ thread -b
"Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) -> Flat Map (22/72)#0" Id=157 BLOCKED on java.util.jar.JarFile@449cf4a0 owned by "Window(TumblingEventTimeWindows(60000), EventTimeTrigger, AppGroupTransactionEventApplyFunction) -> (Sink: Unnamed, Timestamps/Watermarks -> Flat Map) (31/32)#0" Id=151
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:719)
- blocked on java.util.jar.JarFile@449cf4a0
at java.util.zip.ZipFile$ZipFileInflaterInputStream.fill(ZipFile.java:434)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
at sun.misc.Resource.getBytes(Resource.java:124)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:463)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:71)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
- locked org.apache.flink.util.ChildFirstClassLoader@697c9014 <---- but blocks 93 other threads!
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:543)
at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:485)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1612)
at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriterDelegate(StreamTask.java:1596)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:359)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:332)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:324)
at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:314)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:75)
at sun.reflect.GeneratedConstructorAccessor37.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1582)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:740)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
但有时是其他的:
"System Time Trigger for Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) (32/48)#0" Id=232 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@783b47ad
at sun.misc.Unsafe.park(Native Method)
- waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@783b47ad
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
...
"KeyedProcess -> Sink: CCLOUD_APP_SPAN_PORTRAIT_DETECT (24/32)#0" Id=134 BLOCKED on org.apache.flink.util.ChildFirstClassLoader@53624f6b owned by "Window(TumblingEventTimeWindows(60000), EventTimeTrigger, CommonWindowReduceFunction, PassThroughWindowFunction) (32/48)#0" Id=92
at java.lang.Class.getDeclaredFields0(Native Method)
- blocked on org.apache.flink.util.ChildFirstClassLoader@53624f6b
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredField(Class.java:2068)
at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
对于第一个问题: 如果你声明非常大,比你管理的内存大小大得多,可能远大于 4GB。当您从 savepoint/checkpoint 重新启动作业时,Flink 将消耗大量磁盘 I/O。
如果您发现磁盘 IOPS 达到限制,您可以尝试升级磁盘暂时可以解决此问题,但最终您需要降低状态大小。
到最后一个日志: 如果您发现 TM 线程转储充满了 AbstractQueuedSynchronizer 的许多 TIMED_WAITING,则可能遇到此错误:https://issues.apache.org/jira/browse/FLINK-12852