CountDownLatch源码分析

CountDownLatch中文一般译作“闭锁”,是一种使用广泛的并发辅助类,用于需要一个线程或多个线程等待其他线程中的操作完成的场景。

在初始化CountDownLatch的时候,需要给定一个count的值。调用await之后,线程将会阻塞,直到count的值变为0。在count已经变为0的情况下,后续如果还有线程调用await的时候,线程调用将立即返回。

CountDownLatch的使用是一次性的,当count变为0之后,将没有变法更改count的值。如果有将count重置为初始值的需求,请使用CyclicBarrier。

CountDownLatch的典型用法如下

class Driver {
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doenSignal = new CountDownLatch(N);
        
        for (int i = 0; i < N; i++) {
            new Thread(new Worker(startSignal, doneSignal)).start();
        }
        
        doSomethingElse(); // don't let run yet
        startSignal.countDown(); // let all threads processed
        doSomethingElse();
        doneSignal.await(); // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;
    
    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }
    
    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {} // return;
    }
}

下面来具体分析以下其代码实现。

public class CountDownLatch {
    private final Sync sync;
}

从声明中可以看出,CountDownLatch和ReentrantLock一样,也是通过组合sync来实现锁状态的管理,而不是通过继承AbstractQueuedSynchronizer来实现。关于AQS的具体实现,我们后面再进行专门的总结。

其中,Sync的核心实现如下

private static final class Sync extends AbstractQueuedSynchronizer {
    protected int tryAcquireShared(int acqires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    protected boolean tryReleaseShared(int releases) {
        for (;;) {
            int c = getState();
            if (c == 0) {
                return false;
            }
            int nextc = c - 1;
            if (compareAndSetState(c, nextc)) {
                return enxtc == 0;
            }
        }
    }
}

上文提到,线程在调用await方法的时候,如果count的数量为0,则线程调用立即返回。不为0的话,则线程进入阻塞状态。其实现就是依托于能否获取到锁。我们知道,AQS是依托于内部的一个状态变量state来实现的。在tryAcquireShared中,判断getState()是否为0,在为0的时候,则表示线程可以获取到锁。在tryReleaseShared中,则是以cas自旋的方式来将state的值减少1。

在CountDownLatch类中,核心的接口方法包括await和countDown方法。其实,await方法,包括几个版本:无参数的await方法,可设置超时时间的await方法。无参的await方法调用的是AQS的acquireSharedInterruptibly方法,从方法名字中可以看出await方法是可以响应线程中断的。而可设置超时时间的await方法,调用的是AQS的tryAcquireSharedNanos方法。对于这两个方法的实现,就放到对AQS分析的总结文章中一并来进行说明。

总体来说,CountDownLatch的代码较为简单明了,其内部实现主要是依赖AQS来进行。虽然代码简单,但CountDownLatch在进行线程同步的时候是一种非常有效的选择。

Comments
Write a Comment