本文共 10631 字,大约阅读时间需要 35 分钟。
CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待,直到全部线程都到达某一公共屏障点(common barrier point),然后所有线程从阻塞中恢复,再一起运行。所有线程释放之后,该 cyclic 可以再次使用。CyclicBarrier 支持一个可选的命令(Runnable),它运行在所有线程到达屏障点之后,在所有线程恢复之前(即由最后一个进入 barrier 的线程执行)。
看 Doug Lea 提供的例子,这里做了修改:
每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作。回到 main 线程,并合并这些行(计算数值矩阵所有元素的和)。public class Solver { private final int N; private final int[][] data; private final CyclicBarrier barrier; public static void main(String[] args) throws InterruptedException { int[][] matrix = { { 1, 1, 1}, { 2, 2}, { 3}, { 4}, { }}; Solver solver = new Solver(matrix); int sum = 0; for (int i = 0; i < solver.data.length; i++) { if (solver.data[i].length > 0) { sum += solver.data[i][0]; } } System.out.println("matrix sum = " + sum); } private class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { int length = data[myRow].length; int sum = 0; //每个线程计算自己所在矩阵的行的和,并将结果放入数组第一个元素中 for (int i = 0; i < length; i++) { sum += data[myRow][i]; if (i == length - 1) { data[myRow][0] = sum; } } try { System.out.println("Thread" + myRow + ":blocking..."); barrier.await(); System.out.println("Thread" + myRow + ":running..."); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } } public Solver(int[][] matrix) throws InterruptedException { data = matrix; N = matrix.length; //到达屏障点需要执行的命令 Runnable barrierAction = new Runnable() { public void run() { System.out.println("barrierAction"); } }; //主要相互等待的参与者数量 parties 为矩阵拥有的行数 barrier = new CyclicBarrier(N, barrierAction); Listthreads = new ArrayList (N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); }}
输出结果:
Thread1:blocking…
Thread0:blocking… Thread3:blocking… Thread4:blocking… Thread2:blocking… barrierAction Thread2:running… Thread1:running… Thread0:running… Thread3:running… Thread4:running… matrix sum = 14
可以看到线程全部阻塞之后,再执行 barrierAction,之后所有线程再恢复运行,注意线程入队列的顺序是不确定的。
先看 CyclicBarrier 属性
public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ //一个重入锁 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ //在重入锁上创建一个等待条件 private final Condition trip = lock.newCondition(); /** The number of parties */ //在启动 barrier 前必须调用 await() 的线程数 private final int parties; /* The command to run when tripped */ //越过屏障时可运行的命令,如果不执行任何操作,则该参数为 null private final Runnable barrierCommand; /** The current generation */ //当前 cyclic 使用的实例,里面只有一个表示屏障是否处于损坏的状态 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ //还有多少个参与者没有进来 private int count; }
构造一个 CyclicBarrier
/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} * before the barrier is tripped * @param barrierAction the command to execute when the barrier is * tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */public CyclicBarrier(int parties, Runnable barrierAction) { //参与者数量必须大于0 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; //初始化带加入的参与者数量 this.count = parties; //是否需要在越过屏障时执行的命令,可为null this.barrierCommand = barrierAction;}
每一个线程执行 barrier.await() ,除了最后一个到达的线程(满足越过屏障的条件,已到达的线程数量=parties),它会唤醒之前的所有线程。
public int await() throws InterruptedException, BrokenBarrierException { try { //第一个参数表示 是否是自动唤醒,这里为false,一直阻塞 //第二个参数表示 超时多长时间自动唤醒 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen }}/** * Main barrier code, covering the various policies. */private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; //先获取一个独占锁 //按照上面输出的结果为例,线程获取锁的顺序为 thread1->thread0->thread3->thread4->thread2 lock.lock(); try { //1.1 thread1先获取锁,记录此时的实例 final Generation g = generation; //判断实例是否被损坏 if (g.broken) throw new BrokenBarrierException(); //判断是否进入方法前被中断 if (Thread.interrupted()) { //标记 barrier 为损坏状态 breakBarrier(); throw new InterruptedException(); } //每进入一个参与者就将 count 减一,直到减为 0 int index = --count; if (index == 0) { // tripped //thread2 才会进入这里,因为它是最后一个进入的 //是否运行失败,barrierCommand 有可能会失败,与自己实现的程序有关 boolean ranAction = false; try { final Runnable command = barrierCommand; //command 不为空才运行,之前初始化的时候给了 barrierAction if (command != null) //barrierAction 会打印 “barrierAction” command.run(); ranAction = true; //thread2 唤醒其他等待的线程,重置屏障 nextGeneration(); return 0; } finally { //command.run() 失败会到这里 if (!ranAction) //如果 barrierCommand 运行失败,标记 barrier 为损坏状态,并唤醒其他线程 breakBarrier(); } } //thread1->thread0->thread3->thread4 会依次进入到这里 // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) //没有设置超时时间 //thread1->thread0->thread3->thread4 会加入到 AQS 的等待队列中并释放锁 trip.await(); //恢复顺序为 thread1->thread0->thread3->thread4 else if (nanos > 0L) //若设置了超时时间,到点还没有被唤醒就自己从 park 中恢复 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { //被中断了的话,判断当前的实例是否还是原来的,是的话实例没有被标记为损坏就标记一次 if (g == generation && ! g.broken) { //标记 barrier 为损坏状态 breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. //实例已经换了,即使被中断了也要完成等待 Thread.currentThread().interrupt(); } } //线程恢复后再次判断实例是否被损坏 if (g.broken) throw new BrokenBarrierException(); //thread2 成功后会新建一个实例,这里不等说明正常 if (g != generation) //当前线程的到达索引 return index; //屏障点到达前超时会进入这里 if (timed && nanos <= 0L) { //标记 barrier 为损坏状态 breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */private void nextGeneration() { // signal completion of last generation //唤醒其他所有线程,这时等待队列的所有节点都会被移动到阻塞队列中 trip.signalAll(); // set up next generation //重置屏障,为啥 CyclicBarrier 可循环使用的原因就在这里 count = parties; generation = new Generation();}/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */private void breakBarrier() { //标志 CyclicBarrier 失败 generation.broken = true; count = parties; //唤醒其他所有线程,这时等待队列的所有节点都会被移动到阻塞队列中 //恢复运行后会在 dowait 中判断实例的状态然后抛异常 trip.signalAll();}
还有另外一个操作,它能够手动重置屏障
/** * Resets the barrier to its initial state. If any parties are * currently waiting at the barrier, they will return with a * {@link BrokenBarrierException}. Note that resets after * a breakage has occurred for other reasons can be complicated to * carry out; threads need to re-synchronize in some other way, * and choose one to perform the reset. It may be preferable to * instead create a new barrier for subsequent use. */ public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { //为简单处理,标记之前的 barrier 实例为失败,唤醒可能存在阻塞的线程 breakBarrier(); // break the current generation //创建一个新实例 nextGeneration(); // start a new generation } finally { lock.unlock(); } }
总结下抛 BrokenBarrierException 异常的情况:
线程在等待中被中断或者超时、运行期间调用了 reset()、barrierAction() 执行失败。 抛 InterruptedException 异常的情况: 线程进入 dowait() 方法前被中断。CountDownLatch 和 CyclicBarrier的区别?
CountDownLatch 是不可以重置的,所以无法重用;而 CyclicBarrier 则没有这种限制,可以重用。 CountDownLatch 的基本操作组合是 countDown/await。调用 await 的线程阻塞等待 countDown 足够的次数,不管你是在一个线程还是多个线程里 countDown,只要次数足够即可。所以就像 Brain Goetz 说过的,CountDownLatch 操作的是事件。 CyclicBarrier 的基本操作组合,则就是 await,当所有的伙伴(parties)都调用了 await,才会继续进行任务,并自动进行重置。注意,正常情况下,CyclicBarrier 的重置都是自动发生的,如果我们调用 reset 方法,但还有线程在等待,就会导致等待线程被打扰,抛出 BrokenBarrierException 异常。CyclicBarrier 侧重点是线程,而不是调用事件,它的典型应用场景是用来等待并发线程结束。转载地址:http://gmrai.baihongyu.com/