使用CyclicBarrier的Java循环线程

问题描述 投票:1回答:3

我有一个具有这种常规结构的程序:

init
create CyclicBarrier
initialise all threads, attaching to barrier
*start all threads*
wait for join
display stats


*start all threads*
perform calculation
await barrier

我的问题是我需要线程的run()方法来保持循环,直到满足特定条件为止,但是在每次迭代后都暂停以让所有线程同步。

我已经尝试将Runnable方法附加到屏障,但这最终需要重新创建和重新启动每个线程,这不是一个很好的解决方案。

我也尝试过使用CyclicBarrier的reset()方法,但这似乎会在现有线程上引起错误,即使在所有线程完成之后执行也是如此。

我的问题是:

-是否可以“重置”屏障并让所有屏障的线程都遵循与第一次调用await()之前相同的条件?

-或者我应该使用另一种方法来实现这一目标吗?

提前感谢

java multithreading wait barrier cyclicbarrier
3个回答
1
投票

跟随@Totoro的回答,下面是一些示例代码,其中还包含了以下要求:“我需要线程的run()方法来保持循环,直到满足特定条件为止,在每次迭代后暂停以让所有线程同步” 。这使它非常复杂,但是希望程序输出可以阐明示例代码(或者我应该做更好的示例)。

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class BarrierCalc implements Runnable {

public static final int CALC_THREADS = 3;

private static final AtomicBoolean runCondition = new AtomicBoolean();
private static final AtomicBoolean stopRunning = new AtomicBoolean();

public static void main(String[] args) {

    CyclicBarrier barrier = new CyclicBarrier(CALC_THREADS + 1);
    for (int i = 0; i < CALC_THREADS; i++) {
         new Thread(new BarrierCalc(barrier)).start();
    }
    try {
        runCondition.set(true);
        barrier.await();
        showln(0, "STATS!");

        barrier.await();
        showln(0, "start looping 1");
        Thread.sleep(200);
        runCondition.set(false);
        showln(0, "stop looping 1");
        barrier.await();
        runCondition.set(true);

        barrier.await();
        showln(0, "start looping 2");
        Thread.sleep(100);
        runCondition.set(false);
        showln(0, "stop looping 2");
        barrier.await();

        stopRunning.set(true);
        showln(0, "finishing");
        barrier.await();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static final AtomicInteger calcId = new AtomicInteger();

private CyclicBarrier barrier;
private int id;

public BarrierCalc(CyclicBarrier barrier) {
    this.barrier = barrier;
    id = calcId.incrementAndGet();
}

public void run() {

    showln(id, "waiting for start");
    try {
        barrier.await(); // display stats
        barrier.await(); // start running
        int loopNumber = 0;
        while (!stopRunning.get()) {
            showln(id, "looping " + (++loopNumber));
            while (runCondition.get()) {
                Thread.sleep(10); // simulate looping
            }
            showln(id, "synchronizing " + loopNumber);
            barrier.await();
            showln(id, "synchronized " + loopNumber);
            // give main thread a chance to set stopCondition and runCondition
            barrier.await();
        }
        showln(id, "finished");
    } catch (Exception e) {
        e.printStackTrace();
    }
}

private static final long START_TIME = System.currentTimeMillis();

public static void showln(int id, String msg) {
    System.out.println((System.currentTimeMillis() - START_TIME) + "\t ID " + id + ": " + msg);
}

}

请记住,程序输出可能未按预期的顺序:同时向一个同步输出(System.out)写入的线程将以随机顺序进行写访问。


2
投票

barrier.wait()将挂起线程。屏障已经在主线程中,它不需要另一个。在上面的算法中,显示了显示统计信息后正在重新启动的线程。您不需要这样做。如果最近唤醒的线程处于循环中,则它们将再次返回barrier.wait()。


0
投票

[您可以看一下我使用CyclicBarrier的示例。在此示例中,每个工作人员都进行了一些计算,并在障碍处检查了条件。如果满足条件,则所有工作人员都将停止计算,否则他们将继续:

class Solver {
    private static final int REQUIRED_AMOUNT = 100;
    private static final int NUMBER_OF_THREADS = 4;

    AtomicInteger atomicInteger = new AtomicInteger();
    AtomicBoolean continueCalculation = new AtomicBoolean(true);
    final CyclicBarrier barrier;

    public static void main(String[] args) {
        new Solver();
    }

    class Worker implements Runnable {
        int workerId;
        Worker(int workerId) {
            this.workerId = workerId;
        }

        public void run() {
            try {
                while(continueCalculation.get()) {
                    calculate(workerId);
                    barrier.await();
                }

            } catch (Exception ex) {
                System.out.println("Finishing " + workerId);
            }
        }
    }

    public Solver() {
        Runnable barrierAction = () -> {
            if (done()) {
                continueCalculation.set(false);
            }
        };

        barrier = new CyclicBarrier(NUMBER_OF_THREADS, barrierAction);

        List<Thread> threads = new ArrayList(NUMBER_OF_THREADS);
        for (int i = 0; i < NUMBER_OF_THREADS; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }
    }

    private void calculate(int workerId) throws InterruptedException {
        // Some long-running calculation
        Thread.sleep(2000L);
        int r = new Random().nextInt(12);

        System.out.println("Worker #" + workerId + " added " + r +" = " + atomicInteger.addAndGet(r));
    }

    private boolean done() {
        int currentResult = atomicInteger.get();
        boolean collected = currentResult >= REQUIRED_AMOUNT;

        System.out.println("=======================================================");
        System.out.println("Checking state at the barrier: " + currentResult);
        if (collected) {
            System.out.println("Required result is reached");
        }
        System.out.println("=======================================================");

        return collected;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.