使用多线程对Java进行奇偶排序

问题描述 投票:0回答:1

我是这个小组的新手,所以我相信有可能在这里获得帮助,因为我在谷歌上找不到有关我的问题的任何信息。

我正在尝试并行实现Java EvenOdd 转置排序。因此,当我算法时,我认为划分分区是一个好主意:

  1. 如何知道我应该将数组列表分为多少部分?例如,我现在使用 17 个元素,以便更容易理解。
  2. 另外,我不知道是否应该使用称为 ExecutorService 的东西,或者只是正常创建线程。

我在这里添加我当前的逻辑:从奇数阶段开始,分成两部分,分配两个线程进行这些比较,然后创建一个 Barrier 来等待线程,从而启动其他线程以类似方式处理偶数索引。感谢您能给我的任何帮助。目前,我不知道如何实现这个算法,所以任何文字都可能有帮助。

java multithreading parallel-processing bubble-sort
1个回答
2
投票

如何知道我应该将数组列表分为多少部分?例如,我现在使用 17 个元素,以便更容易理解。

您将数组划分为子数组的直觉是正确的,因为它通常是并发排序算法的基础。正如您已经了解的算法一样,我们只需要讨论实现即可:

  1. 直观的解决方案是为每个奇数索引创建一个
    thread
    ,为所有奇数索引创建一个
    start()
    用于比较和交换,然后将
    join()
    创建到主线程以等待结果。冲洗并重复此
    N
    次。然而,这是非常低效的,因为创建和启动所有
    O(N^2)
    线程的开销对于快速比较和交换来说太大了。
  2. 我们还可以为每个奇数索引创建线程,并使它们在左右之间反复比较和交换。这是有问题的,因为我们必须重复锁定左右(以防止数据竞争),这会导致调度程序产生大量无用的开销,并且我们不知道何时完成。
  3. 最后,我们可以为每个奇数索引创建线程,也让它们在左右之间重复交替,并且每次都让它们等待屏障。对我来说,这似乎是正确的选择,因为它最大限度地减少了线程管理开销,并限制了无用的比较和数据竞争。

这导致我们得到以下代码:

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class OddEvenSort {
    public static void main(String[] args) {
        int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
        sortArr(arr);
        System.out.println(Arrays.toString(arr));
    }
    
    public static void sortArr(int[] arr) {
        int threadNum = arr.length/2;
        CyclicBarrier barr = new CyclicBarrier(threadNum);
        Thread[] threads = new Thread[threadNum];
        for (int i = 0; i < threadNum; i++) {
            threads[i] = new Thread(new CompareSwapThread(arr, 2*i + 1, barr));
            threads[i].start();
        }
        for (int i = 0; i < threadNum; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {e.printStackTrace();}
        }
    }
}

class CompareSwapThread implements Runnable {
    private int[] arr;
    private int index;
    private CyclicBarrier barr;
    
    public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
        this.arr = arr;
        this.index = index;
        this.barr = barr;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < arr.length; i++) {
            if (arr[index - 1] > arr[index]) {
                int t = arr[index - 1];
                arr[index - 1] = arr[index];
                arr[index] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                int t = arr[index];
                arr[index] = arr[index + 1];
                arr[index + 1] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
        }
    }   
}

请注意,该算法的运行时间为

O(n)
,这对于此类并行算法来说并不是最佳的。您可以尝试并行实现的另一种算法是 MergeSort 算法。有很多事情可以与此并行化,但最重要的是合并,因为它是顺序算法的瓶颈。您可以查看 Batcher 奇偶合并排序 或查看 其他并行合并

另外,我不知道是否应该使用称为 ExecutorService 的东西,或者只是正常创建线程。

Java 提供了许多不同的并行工具,它们在不同的抽象级别上运行。人们可以说

ExecutorService
比基本线程更“高级”,因为它简化了线程管理。它还将优化任务调度,以便更好地执行。

这是我们的实现,使用

ExecutorService
:

import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class OddEvenSort {
    public static void main(String[] args) {
        int[] arr = {83, 71, 72, 26,  6, 81, 53, 72, 20, 35, 40, 79, 3, 90, 89, 52, 30};
        sortArr(arr);
        System.out.println(Arrays.toString(arr));
    }
    
    public static void sortArr(int[] arr) {
        int threadNum = arr.length/2;
        CyclicBarrier barr = new CyclicBarrier(threadNum);
        ExecutorService exec = Executors.newFixedThreadPool(threadNum);
        Future<?>[] awaits = new Future<?>[threadNum];
        for (int i = 0; i < threadNum; i++) {
            awaits[i] = exec.submit(new CompareSwapThread(arr, 2*i + 1, barr));
        }
        for (int i = 0; i < threadNum; i++) {
            try {
                awaits[i].get();
            } catch (InterruptedException | ExecutionException e) {e.printStackTrace();}
        }
    }
}

class CompareSwapThread implements Runnable {
    private int[] arr;
    private int index;
    private CyclicBarrier barr;
    
    public CompareSwapThread(int[] arr, int index, CyclicBarrier barr) {
        this.arr = arr;
        this.index = index;
        this.barr = barr;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < arr.length; i++) {
            if (arr[index - 1] > arr[index]) {
                int t = arr[index - 1];
                arr[index - 1] = arr[index];
                arr[index] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
            if (index + 1 < arr.length && arr[index] > arr[index + 1]) {
                int t = arr[index];
                arr[index] = arr[index + 1];
                arr[index + 1] = t;
            }
            try {
                barr.await();
            } catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}
        }
    }   
}

如您所见,我们使用线程 factory

newFixedThreadPool
静态方法来生成和实例化所有线程。然后我们将任务添加到线程池中,这将返回一个
Future
变量。当线程完成时,
Future
将保存该值(在我们的例子中为 null)。调用
Future.get()
方法将等待结果(从而线程完成)。请注意,您想要实现某种嵌套线程并行性(例如,并行化 MergeSort 时)。您应该使用
ForkJoinPool
,因为它是专门为此而设计的。最后,这里是一个关于
ExecutorService
的很好的教程。

© www.soinside.com 2019 - 2024. All rights reserved.