如何针对两个不同的任务集正确使用CyclicBarrier的循环行为?

问题描述 投票:3回答:2

CyclicBarrier在最后一个线程进入屏障时执行屏障动作。

如果在CyclicBarrier中定义了5个方(线程),则当第5个线程(任务)进入屏障时,屏障将跳闸(即将重置)并执行屏障动作。

这里,第5个线程是什么类型都没有关系。它可以是任何任务。

所以,我的问题是:

  1. 如果有两组任务(每组5个线程),那么如何确保首先执行一组特定的任务,然后执行屏障动作命令。然后剩下的任务集将被执行,然后屏障操作命令将再次被执行。

  2. CyclicBarrier是否适用于此类情况。如果不是,那么如何在现实情况下正确使用其循环行为。

以下是CyclicBarrier代码。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


public class CyclicBarrierSimpleExample {
    static int barrierActionThreadCount;
    public static void main(String[] args){
        // 5 is the number of parties. So, when the 5th thread will enter the barrier, barrier gets tripped or reset and BarrierAction will be called. 
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new BarrierAction(barrierActionThreadCount));

        for(int i=0;i<5;i++){
            Thread validationTask = new Thread(new ValidationTask(i, cyclicBarrier));
            validationTask.start();
        }

        for(int i=0;i<5;i++){
            Thread serviceTask = new Thread(new ServiceTask(i, cyclicBarrier));
            serviceTask.start();
        }
    }
}

class BarrierAction implements Runnable{
    private int barrierActionThreadCount;
    public BarrierAction(int barrierActionThreadCount) {
        this.barrierActionThreadCount=barrierActionThreadCount;
    }
    // Barrier action will execute when barrier is reached i.e. number of parties waiting got executed
    // In this case, it will trip when 5 different threaValidationTaskds are called and then again its number of parties will reset to 5
    @Override
    public void run() {
        this.barrierActionThreadCount++;
        System.out.println("Barrier action thread got executed "+barrierActionThreadCount+" times");
    }

}


class ValidationTask implements Runnable{
    CyclicBarrier cyclicBarrier; 
    int threadNum;
    public ValidationTask(int threadNum, CyclicBarrier cyclicBarrier) {
        this.threadNum = threadNum;
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(threadNum*1000);
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        System.out.println("Validation Task: Thread-"+threadNum+" got executed");
        try {

            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    }

}

class ServiceTask implements Runnable{
    CyclicBarrier cyclicBarrier; 
    int threadNum;
    public ServiceTask(int threadNum, CyclicBarrier cyclicBarrier) {
        this.threadNum = threadNum;
        this.cyclicBarrier = cyclicBarrier;
    }
    @Override
    public void run() {
        System.out.println("Service Task: Thread-"+threadNum+" got executed");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

    }

}

以上任务的输出:

Validation Task: Thread-0 got executed
Service Task: Thread-1 got executed
Service Task: Thread-0 got executed
Service Task: Thread-4 got executed
Service Task: Thread-2 got executed
Service Task: Thread-3 got executed
Barrier action thread got executed 1 times
Validation Task: Thread-1 got executed
Validation Task: Thread-2 got executed
Validation Task: Thread-3 got executed
Validation Task: Thread-4 got executed
Barrier action thread got executed 2 times

我期望的行为是,应首先执行所有验证任务,然后再执行服务任务。

感谢您的帮助。

java multithreading concurrency synchronization
2个回答
2
投票

第一个问题的答案:如Andreas所述,CountDownlatch可用于您提到的方案。

第二个问题的答案:下面是使用CyclicBarrier的循环行为的代码。

在此示例中,有5个线程正在使用CyclicBarrier并多次调用await()方法以到达并在多个检查点同时等待。

 package dev.cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBehaviorDemo {

    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {

            public void run() {
                System.out.println("All threads Arrived at barrier Checkpoint");
            }

        });

        Thread t1= new Thread(new Task(barrier),"thread-1");
        Thread t2= new Thread(new Task(barrier),"thread-2");
        Thread t3= new Thread(new Task(barrier),"thread-3");
        Thread t4= new Thread(new Task(barrier),"thread-4");
        Thread t5= new Thread(new Task(barrier),"thread-5");

        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
    }

}

class Task implements Runnable {
    CyclicBarrier barrier;

    Task(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    public void run() {
        try {
            System.out.println(Thread.currentThread().getName() + " has started for checkpoint 1");
            barrier.await();

            System.out.println(Thread.currentThread().getName() + " has started for checkpoint 2");
            barrier.await();

            System.out.println(Thread.currentThread().getName() + " has started for checkpoint 3");
            barrier.await();

            System.out.println(Thread.currentThread().getName() + "has finished");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

}

上述程序的输出为

    thread-2 has started for checkpoint 1
    thread-4 has started for checkpoint 1
    thread-1 has started for checkpoint 1
    thread-3 has started for checkpoint 1
    thread-5 has started for checkpoint 1
    All threads Arrived at barrier Checkpoint
    thread-2 has started for checkpoint 2
    thread-4 has started for checkpoint 2
    thread-5 has started for checkpoint 2
    thread-1 has started for checkpoint 2
    thread-3 has started for checkpoint 2
    All threads Arrived at barrier Checkpoint
    thread-3 has started for checkpoint 3
    thread-2 has started for checkpoint 3
    thread-4 has started for checkpoint 3
    thread-5 has started for checkpoint 3
    thread-1 has started for checkpoint 3
    All threads Arrived at barrier Checkpoint
    thread-1has finished
    thread-3has finished
    thread-2has finished
    thread-4has finished
    thread-5has finished

3
投票

您不会使用大小为5的屏障来控制10个线程。

您使用大小为5的屏障来控制5个线程,其中每个线程执行一系列操作,等待所有线程完成一个步骤,然后再继续下一步。

如果要启动10个线程,并让5个ServiceTask线程等待5个ValidationTask线程完成,请使用CountDownLatch,将其同时分配给CountDownLatchValidationTask

然后ServiceTask应该首先调用ServiceTaskawait​()应该最后调用ValidationTask。>>

这样,countDown​()不会在所有ServiceTask完成之前运行,并且ValidationTask可以在完成后立即单独停止,而不必在停止之前都互相等待。

© www.soinside.com 2019 - 2024. All rights reserved.