我如何在Java中并行运行某些东西?

问题描述 投票:0回答:1

我正在尝试打印范围内的所有可能组合。例如,如果我的lowerBound为3而我的max为5,则需要以下组合:(5,4-5,3-4,3)。我已经使用下面的helper()函数实现了这一点。

当然,如果我的max很大,这是很多组合,这将需要很长时间。这就是为什么我尝试实现ForkJoinPool,以便任务并行运行的原因。为此,我创建了一个新的ForkJoinPool。然后,我遍历r的所有可能值(其中r是组合中的数量,在上面的示例r=3中)。对于r的每个值,我创建一个新的HelperCalculator,它扩展了RecursiveTask<Void>。在其中,我递归调用helper()函数。每次我调用此代码时,我都会创建一个新的HelperCalculator,并在其上使用.fork()

问题如下。它不能正确生成所有可能的组合。实际上,它根本不生成任何组合。我曾尝试在calculator.join()之后添加calculator.fork(),但这一直无限进行,直到出现OutOfMemory错误。

很明显,我对ForkJoinPool有一些误解,但尝试了几天后,我再也看不清。

我的主要功能:

            ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
            for (int r = 1; r < 25; r++) {
                int lowerBound = 7;
                int[] data = new int[r];
                int max = 25;
                calculator = new HelperCalculator(data, 0, max, 0, s, n, lowerBound);
                pool.execute(calculator);
                calculator.join();
            }
            pool.shutdown();

HelperCalculator类:

    protected Void compute() {
        helper(data, end, start, index, s, lowerBound);
        return null;
    }

    //Generate all possible combinations
    public void helper(int[] data , int end, int start, int index,int s, int lowerBound) {
        //If the array is filled, print it
        if (index == data.length) {
                System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
            if(data[0] >= lowerBound) {
                HelperCalculator calculator = new HelperCalculator(data,end, start-1, index+1, s, n, lowerBound);
                calculator.fork();
                calculators.add(calculator);
                HelperCalculator calculator2 = new HelperCalculator(data, end, start-1, index, s, n, lowerBound);
                calculator2.fork();
                calculators.add(calculator2);
            }
        }

如何使每个HelperCalculator并行运行,以便使用ForkJoinPool同时运行23个?还是应该使用其他解决方案?

我已经尝试在join()列表上调用isDone()calculators,但是它没有等待它正确完成并且程序刚刚退出。

因为有人不了解该算法,所以它是:

    public static void main(String[] args) {
            for(int r = 3; r > 0; r--) {
                int[] data = new int[r];
                helper(data, 0, 2, 0);
            }
    }

    public static void helper(int[] data , int end, int start, int index) {
        if (index == data.length) {
            System.out.println(Arrays.toString(data));
        } else if (start >= end) {
            data[index] = start;
                helper(data, end, start - 1, index + 1);
                helper(data, end, start - 1, index);
            }
        }
    }

此输出为:

[2, 1, 0]
[2, 1]
[2, 0]
[1, 0]
[2]
[1]
[0]
java multithreading concurrency executorservice forkjoinpool
1个回答
0
投票

您要分派的某些任务会尝试使用相同的数组来评估不同的组合。您可以通过为每个任务创建一个不同的数组或将并行性限制为那些已经拥有数组的任务(即长度不同的任务)来解决此问题。

但是还有另一种可能性;根本不使用数组。您可以将组合存储到int值中,因为每个int值都是位的组合。这样不仅节省了很多内存,而且您只需增加值就可以轻松地迭代所有可能的组合,因为对所有int数字进行迭代也可以对所有可能的比特组合进行迭代¹。我们唯一需要实现的就是通过根据位的位置将其解释为数字,从而为特定的int值生成正确的字符串。

首先,我们可以使用简单的方法并使用现有的类:

public static void main(String[] args) {
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

该方法使用一个异端,因此对于您的示例,您必须像combinations(0, 3)那样调用它,它将打印出来

[0]
[1]
[0, 1]
[2]
[0, 2]
[1, 2]
[0, 1, 2]
3 ms

当然,时间可能有所不同

对于上面的combinations(10, 25)示例,它将打印所有组合,然后在我的机器上打印3477 ms。这听起来像是一个优化的机会,但是我们应该首先考虑哪些操作会带来哪些成本。

组合的迭代在这里已简化为微不足道的操作。创建字符串要贵一个数量级。但这与包括将数据传输到操作系统的实际打印相比仍然是什么,并且取决于系统,实际的渲染可能会增加我们的时间。由于这是在PrintStream内保持锁定的同时完成的,因此所有试图同时打印的线程都将被阻塞,从而使其成为不可并行的操作。

通过创建一个新的PrintStream,禁用换行符的自动刷新功能,并使用一个能够容纳整个输出的超大缓冲区来确定成本的一部分:

public static void main(String[] args) {
    System.setOut(new PrintStream(
        new BufferedOutputStream(new FileOutputStream(FileDescriptor.out), 1<<20), false));
    long t0 = System.nanoTime();
    combinations(10, 25);
    long t1 = System.nanoTime();
    System.out.flush();
    long t2 = System.nanoTime();
    System.out.println((t1 - t0)/1_000_000+" ms");
    System.out.println((t2 - t0)/1_000_000+" ms");
    System.out.flush();
}
static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(
            BitSet.valueOf(new long[]{i}).stream()
                  .mapToObj(b -> String.valueOf(b + start))
                  .collect(Collectors.joining(", ", "[", "]"))
        );
    }
}

在我的机器上,它按照以下顺序打印内容:>

93 ms
3340 ms

显示代码在不可并行打印上花费了三秒钟以上,而在计算上仅花费了约100毫秒。为了完整起见,下面的代码在String生成中进行了下调:

static void combinations(int start, int end) {
    for(int i = 1, stop = (1 << (end - start)) - 1; i <= stop; i++) {
        System.out.println(bits(i, start));
    }
}
static String bits(int bits, int offset) {
    StringBuilder sb = new StringBuilder().append('[');
    for(;;) {
        int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit);
        sb.append(num + offset);
        bits -= bit;
        if(bits == 0) break;
        sb.append(", ");
    }
    return sb.append(']').toString();
}

这将我的计算机上的计算时间减半,而对总时间没有明显的影响,现在就不足为奇了。


但是出于教育目的,忽略潜在的加速不足,让我们讨论如何并行执行此操作。

顺序代码确实已经使任务变成一种形式,可以归结为从起始值到结束值的迭代。现在,我们将此代码重写为ForkJoinTask(或合适的子类),该public class Combinations extends RecursiveAction { public static void main(String[] args) { System.setOut(new PrintStream( new BufferedOutputStream(new FileOutputStream(FileDescriptor.out), 1<<20), false)); ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(); long t0 = System.nanoTime(); Combinations job = Combinations.get(10, 25); pool.execute(job); job.join(); long t1 = System.nanoTime(); System.out.flush(); long t2 = System.nanoTime(); System.out.println((t1 - t0)/1_000_000+" ms"); System.out.println((t2 - t0)/1_000_000+" ms"); System.out.flush(); } public static Combinations get(int min, int max) { return new Combinations(min, 1, (1 << (max - min)) - 1); } final int offset, from; int to; private Combinations(int offset, int from, int to) { this.offset = offset; this.from = from; this.to = to; } @Override protected void compute() { ArrayDeque<Combinations> spawned = new ArrayDeque<>(); while(getSurplusQueuedTaskCount() < 2) { int middle = (from + to) >>> 1; if(middle == from) break; Combinations forked = new Combinations(offset, middle, to); forked.fork(); spawned.addLast(forked); to = middle - 1; } performLocal(); for(;;) { Combinations forked = spawned.pollLast(); if(forked == null) break; if(forked.tryUnfork()) forked.performLocal(); else forked.join(); } } private void performLocal() { for(int i = from, stop = to; i <= stop; i++) { System.out.println(bits(i, offset)); } } static String bits(int bits, int offset) { StringBuilder sb = new StringBuilder().append('['); for(;;) { int bit = Integer.lowestOneBit(bits), num = Integer.numberOfTrailingZeros(bit); sb.append(num + offset); bits -= bit; if(bits == 0) break; sb.append(", "); } return sb.append(']').toString(); } } 代表具有开始和结束值的迭代。然后,通过在中间范围内分割范围,我们增加了将此操作分为两部分的功能,因此我们得到了在范围的每一半上迭代的两个任务。可以重复执行此操作,直到我们决定有足够的潜在并行作业并在本地执行当前迭代为止。本地处理后,我们必须等待拆分的所有任务的完成,以确保根任务的完成意味着所有子任务的完成。

getSurplusQueuedTaskCount()

getSurplusQueuedTaskCount()为我们提供了有关工作线程饱和的暗示,换句话说,分叉更多工作是否有益。将返回的数字与通常较小的阈值进行比较,作业越异构,因此预期的工作负荷就应该越高,这是阈值,以便在作业比其他作业更早完成时可以进行更多的窃取工作。在我们的情况下,预计工作量将非常均衡。

有两种分割方法。示例通常创建两个或多个分叉的子任务,然后将它们联接。这可能导致大量任务仅在等待其他任务。另一种方法是派生一个子任务并更改当前任务,以代表另一个任务。这里,分叉的任务代表[middle, to]范围,而当前任务被修改为代表[from, middle]范围。

分叉足够的任务后,剩余范围将在当前线程中本地处理。然后,该任务将等待分叉的子任务,并进行一项优化:如果没有其他工作线程窃取了这些子任务,它将try to unfork子任务在本地处理它们。

这很顺利,但不幸的是,正如预期的那样,它不会加速操作,因为最昂贵的部分是打印。


¹使用int表示所有组合可将支持的范围长度减少到31,但是请记住,这样的范围长度意味着2³¹ - 1,要进行大量迭代。如果仍然觉得有限制,则可以更改代码以改为使用long。当时支持的范围长度63(换言之2⁶³ - 1组合)足以使计算机忙于宇宙的尽头。

© www.soinside.com 2019 - 2024. All rights reserved.