Flink IOException: Insufficient number of network buffers

问题描述 投票:0回答:2

我正在使用

Flink v1.4.0
。我正在使用
DataSet API
(尽管我认为这并不重要)。

我正在 12 核 VM 上运行一些重型转换。我正在为一个

Flink job
使用 2 个内核,其中我将一些数据存储到一个
Flink Queryable State
中,并使用剩余的 10 个内核运行另一个
Flink
作业。

当我用 10 个内核运行第二个作业时,我似乎得到以下错误:

java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
            at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
            at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
            at java.lang.Thread.run(Thread.java:745)

如果我用 8 个内核运行它,它就能顺利通过。是什么原因造成的,为什么我不能使用其他 2 --> 8+2 = 10 个核心?

java apache-flink
2个回答
10
投票

引用 Apache Flink FAQ:

如果你以非常高的并行度运行 Flink,你可能需要增加网络缓冲区的数量。

默认情况下,Flink 会占用 JVM 堆大小的 10% 作为网络缓冲区,最小为 64MB,最大为 1GB。您可以通过 taskmanager.network.memory.fraction、taskmanager.network.memory.min 和 taskmanager.network.memory.max 调整所有这些值。

详情请参考配置参考

文档中有一个专门的部分介绍如何配置网络缓冲区

综上所述,可以通过设置

./conf/flink-conf.yaml
参数在
taskmanager.network.numberOfBuffers
文件中配置网络缓冲区的数量

参数应设置为

#slots-per-TM^2 * #TMs * 4
,其中
#slots per TM
是每个TaskManager的槽数,
#TMs
是任务管理器的总数。

例如,为了支持 20 台 8 槽机器的集群,您应该使用大约 5000 个网络缓冲区以获得最佳吞吐量。默认情况下,每个网络缓冲区的大小为 32 KiBytes。在上面的示例中,系统将因此分配大约 300 MiBytes 用于网络缓冲区。

详情请参考文档


2
投票

我也遇到了同样的错误

引起:java.io.IOException: Insufficient number of network 缓冲区:需要 13 个,但只有 7 个可用。总人数 网络缓冲区当前设置为每个 32768 字节中的 2048。你可以 通过设置配置键来增加这个数字 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min',和 'taskmanager.memory.network.max'.

下面的代码片段解决了我的问题。

Configuration cfg = new Configuration();
int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
cfg.setString("taskmanager.memory.network.max", "1gb");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(defaultLocalParallelism, cfg);
© www.soinside.com 2019 - 2024. All rights reserved.