标准 专业
多元 极客

JDK研究院(2)——CyclicBarrier

我们在使用CyclicBarrier的过程中,已经了解到,CyclicBarrier相当于一个屏障的作用,它需要等待所有线程都到达屏障后,再放开屏障,让所有线程通过。

我们有两种方法创建CyclicBarrier:

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}
  • parties,相当于一个屏障开关,只有当parties个数的线程到达屏障后,才会释放所有线程。
  • barrierAction,所有线程到达屏障后,首先进行的任务,这个操作将由最后一个到达屏障的线程执行。

由于CyclicBarrier是可以复用的的,所以它引入了一个Generation的概念。

Generation

private static class Generation {
    Generation() {}
    boolean broken;
}
  • broken,当前版本的屏障是否已经打开。

核心变量

成员变量 用途 备注
lock 用于管理栅栏 ReentrantLock
trip 栅栏的竞争条件 Condition
parties 栅栏释放的阈值数  
barrierCommand 达到竞争条件后,需要执行的任务 Runnable
generation 当前版本的栅栏 Generation
count 需要等待的剩余线程个数 每一个版本的栅栏都需要减到0,如果栅栏已经释放,它将会重新赋值为parties大小

核心方法

先来看我们经常用到的一些方法。

await()

调用await()时,证明当前线程已经处理完自己手头上的事情,已经在屏障边就绪了。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

调用dowait()方法完成线程等待操作。

await(long timeout, TimeUnit unit)

await()方法功能相同,但是当前线程是在指定时间范围内等待。

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

dowait(boolean timed, long nanos)

await()方法和await(long timeout, TimeUnit unit)均会调用dowait()方法完成线程等待。

private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    // 获取重入锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 获取屏障的版本
        final Generation g = generation;
        // 如果当前版本已经失效,则抛出BrokenBarrierException异常
        if (g.broken)
            throw new BrokenBarrierException();
        // 如果当前线程已经处于中断状态,则这一版本的屏障不可能达到释放开关,所以需要重置版本
        // 重置版本后,抛出InterruptedException异常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        // 当前线程到达屏障前后,计算还没有到达屏障前的线程数
        int index = --count;
        // 如果当前线程是最后一个到达屏障前的
        if (index == 0) {
            boolean ranAction = false;
            try {
                // 最后一个到达屏障前的线程需要执行释放所有线程之钱的一个任务
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                // 无论是否有释放前的任务,正常情况下均视为执行成功
                ranAction = true;
                // 开启下一版本的屏障
                nextGeneration();
                return 0;
            } finally {
                // 如果执行失败,也会废弃当前版本屏障,开启下一个版本
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 进入自旋
        // 由于最后一个到达屏障的线程在上文中已经开启下一个版本的屏障,并返回0,故不会进入下文中的自旋
        for (;;) {
            try {
                // 非限定时间策略下,释放锁,进入等待线程
                if (!timed)
                    trip.await();
                // 如果等待时间>0,则线程等待当前时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 等待时当前线程出现中断异常,当前屏障版本失效,开启下一个版本
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    // 如果当前版本仍然正常运行,或者版本发生了错乱,我们需要中断当前线程的执行
                    Thread.currentThread().interrupt();
                }
            }
            // 如果当前屏障版本已经失效,则抛出BrokenBarrierException异常
            if (g.broken)
                throw new BrokenBarrierException();
            // 如果当前版本已经更新换代了,返回当前剩余未到达屏障的线程数量
            if (g != generation)
                return index;
            // 在使用限定时间策略下,传入了一个不合理的时间,则当前版本异常,开启下一个版本屏障,并抛出TimeOutException异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放锁
        lock.unlock();
    }
}
  1. 获取当前屏障版本,如果当前屏障版本已经失效,则抛出BrokenBarrierException异常。
  2. 如果当前线程已经处于中断状态,那么当前版本的屏障将不可能开启释放开关,所以就需要重置版本,并抛出InterruptedException异常。
  3. 经过上面的异常过滤,当前线程到达屏障前,计算剩余还没有到达屏障前的线程。
  4. 如果当前线程是最后一个到达屏障前的线程,则需要执行预设的所有线程到达屏障前时首先要执行的任务,获取任务执行结果,即使没有任务,也视为执行任务成功,并开启下一个版本屏障,并返回0。
  5. 如果任务执行失败,也会开启下一个版本的屏障。
  6. 在当前线程不是最后一个到达屏障前的线程,或者当前线程是最后一个到达屏障前的线程,但是执行首要任务失败了,均会进入自旋状态。
  7. 根据限定时间策略,取决于是一直阻塞等待还是阻塞等待一段时间。
  8. 如果在阻塞等待过程中,发生了中断异常,或者超过了指定的阻塞时间,开启下一个版本屏障或者中断当前线程。
  9. 接下来是一些异常情况处理:
    1. 如果当前版本屏障已经失效了,则抛出BrokenBarrierException异常。
    2. 如果当前版本已经更新换代了,则返回当前版本剩余还未到达屏障前的线程数量。
    3. 在使用限定时间策略情况下,传入了一个不合理的时间,则代表当前版本出现了异常,需要开启下一个版本屏障,并抛出TimeOutException异常

dowait()方法将在线程安全的环境下进行。

我们如何开启下一个版本的屏障呢?

nextGeneration()

nextGeneration()方法用于我们通过调用nextGeneration()开启下一个新版本的屏障,当前版本的屏障将会被废弃掉。

private void nextGeneration() {
    // 上一个版本的屏障已经结束,唤起所有等待的线程
    trip.signalAll();
	// 设置下一个版本的屏障
    // count重新设置为parties大小
    count = parties;
    // 重新创建一个新的版本,并赋值给generation成员变量
    generation = new Generation();
}

开启下一个版本的屏障代表上一个版本的屏障已经结束,我们需要唤起所有等待的线程,同时将count归为parties,创建一个新的版本。

breakBarrier()

breakBarrier()方法用于将当前版本的屏障置为失效。

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}
  1. 当前屏障版本置为失效。
  2. 重置等待线程的数量为屏障释放阈值数量。
  3. 唤起所有在屏障前处于等待状态的线程。

isBroken()

isBroken()方法用于判断当前版本的屏障是否已经处于失效状态,此方法是线程安全的。

public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 返回当前屏障版本的状态
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

getParties()

getParties()方法用于获取当前屏障释放需要达到的阈值数。

public int getParties() {
    return parties;
}

reset()

reset()方法用于重置屏障版本。

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();
        nextGeneration();
    } finally {
        lock.unlock();
    }
}
  1. 首先将当前版本的屏障置为失效。
  2. 开启下一个版本的屏障。

reset()方法将在线程安全的情况下进行。

getNumberWaiting()

getNumberWaiting()获取目前处于屏障前等待屏障释放的线程数量,是一个用于debug或者断言使用的好方法。

public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return parties - count;
    } finally {
        lock.unlock();
    }
}

getNumberWaiting()方法将在线程安全的情况下进行。

赞(4) 投币

评论 抢沙发

慕勋的实验室慕勋的研究院

码字不容易,路过请投币

支付宝扫一扫

微信扫一扫