如何在Apache Spark中实现任务的动态负载平衡

问题描述 投票:5回答:3

我知道在Spark中我可以通过使用多个分区来分割我的计算。如果说我可以将输入RDD拆分为1000个分区并且我的机器数量为100,那么Spark会将计算分成1000个任务,并以某种智能方式将它们动态分配到我的100台机器中。

现在假设我最初可以将数据拆分为仅2个分区,但我仍然有100台机器。当然,我的98台机器将闲置。但是当我处理每个任务时,我可能会将其拆分为可能在不同机器上执行的子任务。它可以在带有队列的普通Java中轻松实现,但我不确定在Apache Spark中攻击它的最佳方法是什么。

考虑以下Java伪代码:

BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
    Task nextTask = queue.take();
    List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
    queue.pushAll(newTasks);
} 

假设方法'process_task_and_split_to_sub_tasks()'可以将任何大型任务拆分为多个较小的任务,上述Java代码将使我的所有100个线程保持忙碌。

有没有办法在Spark中实现相同的功能,可能与其他工具结合使用?

更新:已经正确地指出,攻击它的方法之一就是

  1. 生成更精细的密钥和
  2. 然后使用智能分区程序将这些密钥分配给分区。

我想这是解决此问题的“经典”方法,但它要求我能够正确估计每个键的工作量以正确分区。如果我没有提前知道每个密钥的工作量怎么办?当我的大部分机器都闲置等待一些不幸的机器时,我可能最终会遇到非常不幸的分区。

示例:我们以简化的频繁项集挖掘为例。 假设我的文件包含带有从a到j(10个字母)的字母的行,每行中的所有字母都按字母顺序排序而不重复,例如, 'abcf',任务是找到50%的所有行中存在的所有字母组合。例如。如果许多行匹配模式'ab。* f',那么输出将包含{'a','b','f','ab','af','bf','abf'}。 实现它的方法之一是将所有以'a'开头的行发送到一个mapper(机器),所有行以'b'开头到另一个等等。顺便说一句,这就是frequent pattern mining is implemented in Spark。现在假设我有100台机器(但只有10个字母)。然后我的90台机器将闲置。 使用更精细的密钥解决方案,我可以生成10,000个4个字母的前缀,然后根据每个前缀的估计工作以某种方式对它们进行分区。但是我的分区可能非常错误:如果大多数行以'abcd'开头,那么所有的工作都将由负责此前缀的机器完成(除了它之外可能还有其他前缀),再次产生一个当我的大多数机器闲置等待一些不幸的机器时的情况。

在这种情况下,动态负载平衡将是这样的:接收到以“a”开头的行的映射器可能希望进一步分割其行 - 以'ab','ac','ad'开头, ...然后将它们发送给其他10台机器,这些机器可能会决定将其工作进一步分解为更多任务。 我知道标准的Apache Spark没有开箱即用的答案,但我想知道是否有办法实现这一目标。

Kafka(即上面的队列)+ Spark Streaming看起来很有前途,您认为我能够以相对简单的方式使用这些工具来实现动态负载平衡吗?你能推荐其他工具吗?

apache-spark spark-streaming load-balancing job-scheduling
3个回答
2
投票

Spark自己的动态分配可以在某种程度上模拟你想要的东西,但是如果你需要一个低级别控制的详细,高性能的方法,那么Spark不适合你。对于初学者,您将无法动态分割任务 - 您只能调整分配给应用程序的总体资源。

您应该考虑使用低级调度程序并从头开始实现自己的解决方案。


2
投票

要归档您的需求,您可以将数据从两个分区重新分区到您想要的任何数量的分区。

https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#repartition-int-

BTW,spark Streaming与你的问题无关。

请注意,并行度不仅取决于数据集的分区,还取决于我们的作业/算法。


2
投票

现在假设我有100台机器(但只有10个字母)。接收到以'a'开头的行的映射器可能希望进一步分割其行 - 以'ab','ac','ad'等开头,然后将它们发送到其他10台机器。

这不是Spark的工作原理。 “Mapper”(任务)主要是对所有分布式上下文一无所知。在这个级别上没有访问SparkContext,我们更长时间有RDDs,只需输入本地Iterator和代码即可执行。它无法启动,也无法创建新任务。

与此同时,您的问题定义是人为的。要找到频繁模式,您必须聚合数据,因此您需要随机播放。在这一点上,对应于给定模式的记录必须被混洗到同一台机器上。确保数据的正确分布是Partitioner的工作,这里没有“分裂”的地方。

© www.soinside.com 2019 - 2024. All rights reserved.