我正在阅读有 5 个分区的 kafka 主题。由于 5 个内核不足以处理负载,我正在将输入重新分区为 30 个。我为我的 spark 进程提供了 30 个内核,每个执行程序上有 6 个内核。通过此设置,我假设每个执行者将获得 6 个任务。但我们经常看到一个执行者得到 4 个任务,而其他执行者得到 7 个任务。它扭曲了我们工作的处理时间。
有人能帮我理解为什么所有的执行者不会得到相同数量的任务吗?这是作业运行 12 小时后的执行器指标。
地址 | 状态 | RDD 块 | 存储内存 | 使用的磁盘 | 核心 | 活动任务 | 失败的任务 | 完成任务 | 总任务 | 任务时间(GC时间) | 输入 | 随机阅读 | 随机写入 |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
ip1:36759 | 活跃 | 7 | 1.6 MB / 144.7 MB | 0.0 乙 | 6 | 6 | 0 | 442506 | 442512 | 35.9 小时(26 分钟) | 42.1GB | 25.9GB | 24.7GB |
ip2:36689 | 活跃 | 0 | 0.0 B / 128 MB | 0.0 乙 | 0 | 0 | 0 | 0 | 0 | 0 毫秒(0 毫秒) | 0.0 乙 | 0.0 乙 | 0.0 乙 |
ip5:44481 | 活跃 | 7 | 1.6 MB / 144.7 MB | 0.0 乙 | 6 | 6 | 0 | 399948 | 399954 | 29.0 小时(20 分钟) | 37.3GB | 22.8GB | 24.7GB |
ip1:33187 | 活跃 | 7 | 1.5 MB / 144.7 MB | 0.0 乙 | 6 | 5 | 0 | 445720 | 445725 | 35.9 小时(26 分钟) | 42.4GB | 26GB | 24.7GB |
ip3:34935 | 活跃 | 7 | 1.6 MB / 144.7 MB | 0.0 乙 | 6 | 6 | 0 | 427950 | 427956 | 33.8 小时(23 分钟) | 40.5GB | 24.8GB | 24.7GB |
ip4:38851 | 活跃 | 7 | 1.7 MB / 144.7 MB | 0.0 乙 | 6 | 6 | 0 | 410276 | 410282 | 31.6 小时(24 分钟) | 39GB | 23.9GB | 24.7GB |
如果您看到 ip5:44481 完成的任务数量存在偏差。我也没有看到异常的 GC 活动。
我应该查看哪些指标来理解这种偏差?
更新
进一步调试后,我可以看到所有分区的数据都不相等。并且所有任务都给出了大约相同数量的记录。
执行者ID | 地址 | 任务时间 | 总任务 | 失败的任务 | 已终止的任务 | 成功的任务 | 随机读取大小/记录 | 黑名单 |
---|---|---|---|---|---|---|---|---|
0 stdout stderr |
ip3:37049 | 0.8 秒 | 6 | 0 | 0 | 6 | 600.9 KB / 272 | 假 |
1 stdout stderr |
ip1:37875 | 0.6 秒 | 6 | 0 | 0 | 6 | 612.2 KB / 273 | 假 |
2 stdout stderr |
ip3:41739 | 0.7 秒 | 5 | 0 | 0 | 5 | 529.0 KB / 226 | 假 |
3 stdout stderr |
ip2:38269 | 0.5 秒 | 6 | 0 | 0 | 6 | 623.4 KB / 272 | 假 |
4 stdout stderr |
ip1:40083 | 0.6 秒 | 7 | 0 | 0 | 7 | 726.7 KB / 318 | 假 |
这是重新分区后阶段的统计数据。我们可以看到任务数量与记录数量成正比。下一步,我将尝试查看分区函数的工作原理。
更新二:
我遇到的唯一解释是 spark 使用循环分区。并且它在每个分区上独立执行。例如,如果 node1 上有 5 条记录,node2 上有 7 条记录。 Node1 的循环将分发大约 3 条记录给节点 1,大约 2 条记录给节点 2。 Node2 的轮询会将大约 4 条记录分配给 node1,将大约 3 条记录分配给 node2。因此,有可能在节点 1 上有 7 条记录,在节点 2 上有 5 条记录,具体取决于在每个单独节点的框架代码中解释的节点顺序。 来源
注意: 如果你注意到表现最好的人都在同一个 IP 上。是因为洗牌后在同一主机上传输数据更快吗?与其他 ip 相比?
根据以上数据,我们可以看到重新分区工作正常,即将相同数量的记录分配给 30 个分区,但问题是为什么有些执行者获得的分区比其他执行者多。