博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【Java并发编程实战】——CyclicBarrier源码分析
阅读量:4181 次
发布时间:2019-05-26

本文共 10631 字,大约阅读时间需要 35 分钟。

CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待,直到全部线程都到达某一公共屏障点(common barrier point),然后所有线程从阻塞中恢复,再一起运行。所有线程释放之后,该 cyclic 可以再次使用。CyclicBarrier 支持一个可选的命令(Runnable),它运行在所有线程到达屏障点之后,在所有线程恢复之前(即由最后一个进入 barrier 的线程执行)。

看 Doug Lea 提供的例子,这里做了修改:

每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作。回到 main 线程,并合并这些行(计算数值矩阵所有元素的和)。

public class Solver {
private final int N; private final int[][] data; private final CyclicBarrier barrier; public static void main(String[] args) throws InterruptedException {
int[][] matrix = {
{
1, 1, 1}, {
2, 2}, {
3}, {
4}, {
}}; Solver solver = new Solver(matrix); int sum = 0; for (int i = 0; i < solver.data.length; i++) {
if (solver.data[i].length > 0) {
sum += solver.data[i][0]; } } System.out.println("matrix sum = " + sum); } private class Worker implements Runnable {
int myRow; Worker(int row) {
myRow = row; } public void run() {
int length = data[myRow].length; int sum = 0; //每个线程计算自己所在矩阵的行的和,并将结果放入数组第一个元素中 for (int i = 0; i < length; i++) {
sum += data[myRow][i]; if (i == length - 1) {
data[myRow][0] = sum; } } try {
System.out.println("Thread" + myRow + ":blocking..."); barrier.await(); System.out.println("Thread" + myRow + ":running..."); } catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace(); } } } public Solver(int[][] matrix) throws InterruptedException {
data = matrix; N = matrix.length; //到达屏障点需要执行的命令 Runnable barrierAction = new Runnable() {
public void run() {
System.out.println("barrierAction"); } }; //主要相互等待的参与者数量 parties 为矩阵拥有的行数 barrier = new CyclicBarrier(N, barrierAction); List
threads = new ArrayList
(N); for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); }}

输出结果:

Thread1:blocking…

Thread0:blocking…
Thread3:blocking…
Thread4:blocking…
Thread2:blocking…
barrierAction
Thread2:running…
Thread1:running…
Thread0:running…
Thread3:running…
Thread4:running…
matrix sum = 14

可以看到线程全部阻塞之后,再执行 barrierAction,之后所有线程再恢复运行,注意线程入队列的顺序是不确定的。

先看 CyclicBarrier 属性

public class CyclicBarrier {
/** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation {
boolean broken = false; } /** The lock for guarding barrier entry */ //一个重入锁 private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ //在重入锁上创建一个等待条件 private final Condition trip = lock.newCondition(); /** The number of parties */ //在启动 barrier 前必须调用 await() 的线程数 private final int parties; /* The command to run when tripped */ //越过屏障时可运行的命令,如果不执行任何操作,则该参数为 null private final Runnable barrierCommand; /** The current generation */ //当前 cyclic 使用的实例,里面只有一个表示屏障是否处于损坏的状态 private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ //还有多少个参与者没有进来 private int count; }

构造一个 CyclicBarrier

/** * Creates a new {@code CyclicBarrier} that will trip when the * given number of parties (threads) are waiting upon it, and which * will execute the given barrier action when the barrier is tripped, * performed by the last thread entering the barrier. * * @param parties the number of threads that must invoke {@link #await} *        before the barrier is tripped * @param barrierAction the command to execute when the barrier is *        tripped, or {@code null} if there is no action * @throws IllegalArgumentException if {@code parties} is less than 1 */public CyclicBarrier(int parties, Runnable barrierAction) {
//参与者数量必须大于0 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; //初始化带加入的参与者数量 this.count = parties; //是否需要在越过屏障时执行的命令,可为null this.barrierCommand = barrierAction;}

每一个线程执行 barrier.await() ,除了最后一个到达的线程(满足越过屏障的条件,已到达的线程数量=parties),它会唤醒之前的所有线程。

public int await() throws InterruptedException, BrokenBarrierException {
try {
//第一个参数表示 是否是自动唤醒,这里为false,一直阻塞 //第二个参数表示 超时多长时间自动唤醒 return dowait(false, 0L); } catch (TimeoutException toe) {
throw new Error(toe); // cannot happen }}/** * Main barrier code, covering the various policies. */private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock; //先获取一个独占锁 //按照上面输出的结果为例,线程获取锁的顺序为 thread1->thread0->thread3->thread4->thread2 lock.lock(); try {
//1.1 thread1先获取锁,记录此时的实例 final Generation g = generation; //判断实例是否被损坏 if (g.broken) throw new BrokenBarrierException(); //判断是否进入方法前被中断 if (Thread.interrupted()) {
//标记 barrier 为损坏状态 breakBarrier(); throw new InterruptedException(); } //每进入一个参与者就将 count 减一,直到减为 0 int index = --count; if (index == 0) {
// tripped //thread2 才会进入这里,因为它是最后一个进入的 //是否运行失败,barrierCommand 有可能会失败,与自己实现的程序有关 boolean ranAction = false; try {
final Runnable command = barrierCommand; //command 不为空才运行,之前初始化的时候给了 barrierAction if (command != null) //barrierAction 会打印 “barrierAction” command.run(); ranAction = true; //thread2 唤醒其他等待的线程,重置屏障 nextGeneration(); return 0; } finally {
//command.run() 失败会到这里 if (!ranAction) //如果 barrierCommand 运行失败,标记 barrier 为损坏状态,并唤醒其他线程 breakBarrier(); } } //thread1->thread0->thread3->thread4 会依次进入到这里 // loop until tripped, broken, interrupted, or timed out for (;;) {
try {
if (!timed) //没有设置超时时间 //thread1->thread0->thread3->thread4 会加入到 AQS 的等待队列中并释放锁 trip.await(); //恢复顺序为 thread1->thread0->thread3->thread4 else if (nanos > 0L) //若设置了超时时间,到点还没有被唤醒就自己从 park 中恢复 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) {
//被中断了的话,判断当前的实例是否还是原来的,是的话实例没有被标记为损坏就标记一次 if (g == generation && ! g.broken) {
//标记 barrier 为损坏状态 breakBarrier(); throw ie; } else {
// We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. //实例已经换了,即使被中断了也要完成等待 Thread.currentThread().interrupt(); } } //线程恢复后再次判断实例是否被损坏 if (g.broken) throw new BrokenBarrierException(); //thread2 成功后会新建一个实例,这里不等说明正常 if (g != generation) //当前线程的到达索引 return index; //屏障点到达前超时会进入这里 if (timed && nanos <= 0L) {
//标记 barrier 为损坏状态 breakBarrier(); throw new TimeoutException(); } } } finally {
lock.unlock(); }}/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */private void nextGeneration() {
// signal completion of last generation //唤醒其他所有线程,这时等待队列的所有节点都会被移动到阻塞队列中 trip.signalAll(); // set up next generation //重置屏障,为啥 CyclicBarrier 可循环使用的原因就在这里 count = parties; generation = new Generation();}/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */private void breakBarrier() {
//标志 CyclicBarrier 失败 generation.broken = true; count = parties; //唤醒其他所有线程,这时等待队列的所有节点都会被移动到阻塞队列中 //恢复运行后会在 dowait 中判断实例的状态然后抛异常 trip.signalAll();}

还有另外一个操作,它能够手动重置屏障

/**     * Resets the barrier to its initial state.  If any parties are     * currently waiting at the barrier, they will return with a     * {@link BrokenBarrierException}. Note that resets after     * a breakage has occurred for other reasons can be complicated to     * carry out; threads need to re-synchronize in some other way,     * and choose one to perform the reset.  It may be preferable to     * instead create a new barrier for subsequent use.     */    public void reset() {
final ReentrantLock lock = this.lock; lock.lock(); try {
//为简单处理,标记之前的 barrier 实例为失败,唤醒可能存在阻塞的线程 breakBarrier(); // break the current generation //创建一个新实例 nextGeneration(); // start a new generation } finally {
lock.unlock(); } }

总结下抛 BrokenBarrierException 异常的情况:

线程在等待中被中断或者超时、运行期间调用了 reset()、barrierAction() 执行失败。
抛 InterruptedException 异常的情况: 线程进入 dowait() 方法前被中断。

CountDownLatch 和 CyclicBarrier的区别?

CountDownLatch 是不可以重置的,所以无法重用;而 CyclicBarrier 则没有这种限制,可以重用。
CountDownLatch 的基本操作组合是 countDown/await。调用 await 的线程阻塞等待 countDown 足够的次数,不管你是在一个线程还是多个线程里 countDown,只要次数足够即可。所以就像 Brain Goetz 说过的,CountDownLatch 操作的是事件。
CyclicBarrier 的基本操作组合,则就是 await,当所有的伙伴(parties)都调用了 await,才会继续进行任务,并自动进行重置。注意,正常情况下,CyclicBarrier 的重置都是自动发生的,如果我们调用 reset 方法,但还有线程在等待,就会导致等待线程被打扰,抛出 BrokenBarrierException 异常。CyclicBarrier 侧重点是线程,而不是调用事件,它的典型应用场景是用来等待并发线程结束。

转载地址:http://gmrai.baihongyu.com/

你可能感兴趣的文章
Linux设备驱动--块设备(四)之“自造请求”
查看>>
Nand Flash和Nor Flash相关知识
查看>>
NAND flash和NOR flash的区别
查看>>
writeb(), writew(), writel(),readb(), readw(), readl() 宏函数
查看>>
NOR Flash擦写和原理分析
查看>>
51单片机程序执行流程(STARTUP.A51)
查看>>
原码, 反码, 补码 详解
查看>>
Java自学第一阶段(二)- 小试牛刀
查看>>
Java自学第一阶段(三)- 万能的变量
查看>>
Java自学第一阶段(四)-万能的变量(2)
查看>>
HashMap存储原理以及与hashcode、equals方法的关系
查看>>
python3.6在windows下安装scrapy遇到的问题总结
查看>>
pycharm中打开scrapy项目,import scrapy报错问题
查看>>
scrapy爬取图片,自定义图片下载路径和图片名称
查看>>
python3下import MySQLdb出错问题
查看>>
Maven搭建SSM框架(eclipse)
查看>>
synchronized+Integer模拟火车票预售,出现的问题总结
查看>>
沉浸式过山车,感受巨蚁数字心灵的激情
查看>>
htmlunit爬取js异步加载后的页面
查看>>
修改Linux系统locale设置
查看>>