我正在运行一个 1.13.1 flink 集群,我在其中执行批处理作业,该作业执行 athena 查询并将结果保存在 athena 表中。
我一天内多次提交这些作业。
每次执行,集群使用的元空间都会增加7MB,并且从未释放。
每隔几天就会出现由元空间引起的 OutOfMemoryError,我需要重新启动集群来清理元空间
taskmanager.memory.jvm-metaspace.size 设置为 24gb
知道什么可能导致这个元空间增长以及为什么它没有被释放吗?
我遇到了一些 FLINK jira 票证,声称问题已解决,但我仍然面临同样的问题。 https://issues.apache.org/jira/browse/FLINK-16408 https://issues.apache.org/jira/browse/FLINK-19005
当您使用的代码保留类时,可能会发生这种情况。当 Hadoop 文件系统被调用(通过 Hudi)时,我必须修复 Flink 作业的类似问题。其实有两个问题:
修复这两个问题需要一些直升机特技 - 基本上我向 Hudi 相关的 Flink 操作符的
close()
方法添加了代码,该方法 (a) 发现并中断守护线程,(b) 使用反射来访问 JVM 关闭列表钩子,然后显式调用 Hadoop 钩子。
这是包含我的修复的 Flink 运算符代码的骨架:
public class SomeFunction extends RichMapFunction<In, Out> {
private static final Logger LOGGER = LoggerFactory.getLogger(SomeFunction.class);
private static final AtomicBoolean CLEANUP_HADOOP = new AtomicBoolean();
@Override
public void open(Configuration parameters) throws Exception {
// Every instance of this operator running in the same TM will set this true,
// but that's OK.
CLEANUP_HADOOP.set(true);
}
@Override
public void close() throws Exception {
// We only want to do this cleanup work once per TM, so if the value is true
// for us, it will be false for all other instances running in this JVM.
if (CLEANUP_HADOOP.getAndSet(false)) {
interruptHadoopStatisticsThread();
runHadoopShutdownHook();
}
super.close();
}
/**
* We have the Hadoop ShutdownHookManager, which has a shutdown hook that's registered with the
* JVM. This winds up hanging onto the Flink user classloader, and since the TM's JVM isn't shut
* down when doing a job restart, we wind up leaking.
*/
@SuppressWarnings("unchecked")
private void runHadoopShutdownHook() {
try {
Class<?> clazz = Class.forName("java.lang.ApplicationShutdownHooks");
Field field = clazz.getDeclaredField("hooks");
field.setAccessible(true);
Map<Thread, Thread> hooks = (Map<Thread, Thread>) field.get(null);
for (Thread hook : hooks.keySet()) {
if (hook.getClass()
.getName()
.contains("org.apache.hadoop.util.ShutdownHookManager")) {
LOGGER.info("Removing/running Hadoop ShutdownHookManager shutdown hook");
hooks.remove(hook);
hook.run();
try {
hook.join();
} catch (InterruptedException x) {
}
break;
}
}
} catch (Throwable t) {
LOGGER.error("Error trying to get/run Hadoop shutdown hook", t);
}
}
/**
* There's a problem where the Hadoop FileSystem fires up a daemon thread to clean up statistics
* data, but that thread never is interrupted, so it keeps running (and hanging onto a
* classloader). We want to explicitly interrupt it.
*/
private void interruptHadoopStatisticsThread() {
Thread[] activeThreads = new Thread[Thread.activeCount() + 10];
int numThreads = Thread.enumerate(activeThreads);
for (int i = 0; i < numThreads; i++) {
Thread t = activeThreads[i];
if (t.getName()
.equals(
"org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")) {
LOGGER.info("Interrupting Hadoop FileSystem daemon thread");
t.interrupt();
break;
}
}
}
}