为什么Flink核心节点不释放JVM Metaspace内存?

问题描述 投票:0回答:1

我正在运行一个 1.13.1 flink 集群,我在其中执行批处理作业,该作业执行 athena 查询并将结果保存在 athena 表中。

我一天内多次提交这些作业。

每次执行,集群使用的元空间都会增加7MB,并且从未释放。

每隔几天就会出现由元空间引起的 OutOfMemoryError,我需要重新启动集群来清理元空间

taskmanager.memory.jvm-metaspace.size 设置为 24gb

知道什么可能导致这个元空间增长以及为什么它没有被释放吗?

我遇到了一些 FLINK jira 票证,声称问题已解决,但我仍然面临同样的问题。 https://issues.apache.org/jira/browse/FLINK-16408 https://issues.apache.org/jira/browse/FLINK-19005

apache-flink amazon-emr
1个回答
0
投票

当您使用的代码保留类时,可能会发生这种情况。当 Hadoop 文件系统被调用(通过 Hudi)时,我必须修复 Flink 作业的类似问题。其实有两个问题:

  1. Hadoop 创建了一个永远不会终止的守护线程,它挂在一个类加载器上,而类加载器又挂在类上。
  2. Hadoop 安装了一个 JVM 关闭钩子,它也挂在类加载器上。

修复这两个问题需要一些直升机特技 - 基本上我向 Hudi 相关的 Flink 操作符的

close()
方法添加了代码,该方法 (a) 发现并中断守护线程,(b) 使用反射来访问 JVM 关闭列表钩子,然后显式调用 Hadoop 钩子。

这是包含我的修复的 Flink 运算符代码的骨架:

public class SomeFunction extends RichMapFunction<In, Out> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SomeFunction.class);

    private static final AtomicBoolean CLEANUP_HADOOP = new AtomicBoolean();

    @Override
    public void open(Configuration parameters) throws Exception {

        // Every instance of this operator running in the same TM will set this true,
        // but that's OK.
        CLEANUP_HADOOP.set(true);
    }

    @Override
    public void close() throws Exception {

        // We only want to do this cleanup work once per TM, so if the value is true
        // for us, it will be false for all other instances running in this JVM.

        if (CLEANUP_HADOOP.getAndSet(false)) {
            interruptHadoopStatisticsThread();
            runHadoopShutdownHook();
        }

        super.close();
    }

    /**
     * We have the Hadoop ShutdownHookManager, which has a shutdown hook that's registered with the
     * JVM. This winds up hanging onto the Flink user classloader, and since the TM's JVM isn't shut
     * down when doing a job restart, we wind up leaking.
     */
    @SuppressWarnings("unchecked")
    private void runHadoopShutdownHook() {

        try {
            Class<?> clazz = Class.forName("java.lang.ApplicationShutdownHooks");
            Field field = clazz.getDeclaredField("hooks");
            field.setAccessible(true);
            Map<Thread, Thread> hooks = (Map<Thread, Thread>) field.get(null);
            for (Thread hook : hooks.keySet()) {
                if (hook.getClass()
                        .getName()
                        .contains("org.apache.hadoop.util.ShutdownHookManager")) {
                    LOGGER.info("Removing/running Hadoop ShutdownHookManager shutdown hook");
                    hooks.remove(hook);
                    hook.run();
                    try {
                        hook.join();
                    } catch (InterruptedException x) {
                    }

                    break;
                }
            }
        } catch (Throwable t) {
            LOGGER.error("Error trying to get/run Hadoop shutdown hook", t);
        }
    }

    /**
     * There's a problem where the Hadoop FileSystem fires up a daemon thread to clean up statistics
     * data, but that thread never is interrupted, so it keeps running (and hanging onto a
     * classloader). We want to explicitly interrupt it.
     */
    private void interruptHadoopStatisticsThread() {
        Thread[] activeThreads = new Thread[Thread.activeCount() + 10];
        int numThreads = Thread.enumerate(activeThreads);
        for (int i = 0; i < numThreads; i++) {
            Thread t = activeThreads[i];
            if (t.getName()
                    .equals(
                            "org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner")) {
                LOGGER.info("Interrupting Hadoop FileSystem daemon thread");
                t.interrupt();
                break;
            }
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.