我们在使用CyclicBarrier的过程中,已经了解到,CyclicBarrier相当于一个屏障的作用,它需要等待所有线程都到达屏障后,再放开屏障,让所有线程通过。
我们有两种方法创建CyclicBarrier:
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
- parties,相当于一个屏障开关,只有当parties个数的线程到达屏障后,才会释放所有线程。
- barrierAction,所有线程到达屏障后,首先进行的任务,这个操作将由最后一个到达屏障的线程执行。
由于CyclicBarrier是可以复用的的,所以它引入了一个Generation的概念。
Generation
private static class Generation {
Generation() {}
boolean broken;
}
- broken,当前版本的屏障是否已经打开。
核心变量
成员变量 | 用途 | 备注 |
---|---|---|
lock | 用于管理栅栏 | ReentrantLock |
trip | 栅栏的竞争条件 | Condition |
parties | 栅栏释放的阈值数 | |
barrierCommand | 达到竞争条件后,需要执行的任务 | Runnable |
generation | 当前版本的栅栏 | Generation |
count | 需要等待的剩余线程个数 | 每一个版本的栅栏都需要减到0,如果栅栏已经释放,它将会重新赋值为parties大小 |
核心方法
先来看我们经常用到的一些方法。
await()
调用await()时,证明当前线程已经处理完自己手头上的事情,已经在屏障边就绪了。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
调用dowait()方法完成线程等待操作。
await(long timeout, TimeUnit unit)
和await()方法功能相同,但是当前线程是在指定时间范围内等待。
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
dowait(boolean timed, long nanos)
await()方法和await(long timeout, TimeUnit unit)均会调用dowait()方法完成线程等待。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 获取重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取屏障的版本
final Generation g = generation;
// 如果当前版本已经失效,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前线程已经处于中断状态,则这一版本的屏障不可能达到释放开关,所以需要重置版本
// 重置版本后,抛出InterruptedException异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 当前线程到达屏障前后,计算还没有到达屏障前的线程数
int index = --count;
// 如果当前线程是最后一个到达屏障前的
if (index == 0) {
boolean ranAction = false;
try {
// 最后一个到达屏障前的线程需要执行释放所有线程之钱的一个任务
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 无论是否有释放前的任务,正常情况下均视为执行成功
ranAction = true;
// 开启下一版本的屏障
nextGeneration();
return 0;
} finally {
// 如果执行失败,也会废弃当前版本屏障,开启下一个版本
if (!ranAction)
breakBarrier();
}
}
// 进入自旋
// 由于最后一个到达屏障的线程在上文中已经开启下一个版本的屏障,并返回0,故不会进入下文中的自旋
for (;;) {
try {
// 非限定时间策略下,释放锁,进入等待线程
if (!timed)
trip.await();
// 如果等待时间>0,则线程等待当前时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 等待时当前线程出现中断异常,当前屏障版本失效,开启下一个版本
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 如果当前版本仍然正常运行,或者版本发生了错乱,我们需要中断当前线程的执行
Thread.currentThread().interrupt();
}
}
// 如果当前屏障版本已经失效,则抛出BrokenBarrierException异常
if (g.broken)
throw new BrokenBarrierException();
// 如果当前版本已经更新换代了,返回当前剩余未到达屏障的线程数量
if (g != generation)
return index;
// 在使用限定时间策略下,传入了一个不合理的时间,则当前版本异常,开启下一个版本屏障,并抛出TimeOutException异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
// 释放锁
lock.unlock();
}
}
- 获取当前屏障版本,如果当前屏障版本已经失效,则抛出BrokenBarrierException异常。
- 如果当前线程已经处于中断状态,那么当前版本的屏障将不可能开启释放开关,所以就需要重置版本,并抛出InterruptedException异常。
- 经过上面的异常过滤,当前线程到达屏障前,计算剩余还没有到达屏障前的线程。
- 如果当前线程是最后一个到达屏障前的线程,则需要执行预设的所有线程到达屏障前时首先要执行的任务,获取任务执行结果,即使没有任务,也视为执行任务成功,并开启下一个版本屏障,并返回0。
- 如果任务执行失败,也会开启下一个版本的屏障。
- 在当前线程不是最后一个到达屏障前的线程,或者当前线程是最后一个到达屏障前的线程,但是执行首要任务失败了,均会进入自旋状态。
- 根据限定时间策略,取决于是一直阻塞等待还是阻塞等待一段时间。
- 如果在阻塞等待过程中,发生了中断异常,或者超过了指定的阻塞时间,开启下一个版本屏障或者中断当前线程。
- 接下来是一些异常情况处理:
- 如果当前版本屏障已经失效了,则抛出BrokenBarrierException异常。
- 如果当前版本已经更新换代了,则返回当前版本剩余还未到达屏障前的线程数量。
- 在使用限定时间策略情况下,传入了一个不合理的时间,则代表当前版本出现了异常,需要开启下一个版本屏障,并抛出TimeOutException异常
dowait()方法将在线程安全的环境下进行。
我们如何开启下一个版本的屏障呢?
nextGeneration()
nextGeneration()方法用于我们通过调用nextGeneration()开启下一个新版本的屏障,当前版本的屏障将会被废弃掉。
private void nextGeneration() {
// 上一个版本的屏障已经结束,唤起所有等待的线程
trip.signalAll();
// 设置下一个版本的屏障
// count重新设置为parties大小
count = parties;
// 重新创建一个新的版本,并赋值给generation成员变量
generation = new Generation();
}
开启下一个版本的屏障代表上一个版本的屏障已经结束,我们需要唤起所有等待的线程,同时将count归为parties,创建一个新的版本。
breakBarrier()
breakBarrier()方法用于将当前版本的屏障置为失效。
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
- 当前屏障版本置为失效。
- 重置等待线程的数量为屏障释放阈值数量。
- 唤起所有在屏障前处于等待状态的线程。
isBroken()
isBroken()方法用于判断当前版本的屏障是否已经处于失效状态,此方法是线程安全的。
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回当前屏障版本的状态
return generation.broken;
} finally {
lock.unlock();
}
}
getParties()
getParties()方法用于获取当前屏障释放需要达到的阈值数。
public int getParties() {
return parties;
}
reset()
reset()方法用于重置屏障版本。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}
- 首先将当前版本的屏障置为失效。
- 开启下一个版本的屏障。
reset()方法将在线程安全的情况下进行。
getNumberWaiting()
getNumberWaiting()获取目前处于屏障前等待屏障释放的线程数量,是一个用于debug或者断言使用的好方法。
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
getNumberWaiting()方法将在线程安全的情况下进行。