LinkedBlockingQueue源码分析

作为线程池中经常使用的阻塞队列,在分析了ArrayBlockingQueue之后,再来看一下LinkedBlockingQueue的实现。

LinkedBlockingQueue的签名如下

public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

在LinkedBlockingQueue中节点的声明如下

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

可以看出Node节点只有一个next指向,说明链表是单向的,只能提供从前节点到后节点的查找。

private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

与ArrayBlockingQueue相同的是,LinkedBlockingQueue也使用了notFull和notEmpty这两个条件队列。不同的是,ArrayBlockingQueue内部只使用了一个锁,在take操作和put操作上都使用同一个锁。而在LinkedBlockingQueue中,使用了两个锁,分别供take操作和put操作来使用,并且notFull和notEmpty这两个条件队列,分别在put操作和take操作上进行等待。这种锁的细分,是能保证LinkedBlockingQueue比ArrayBlockingQueue提供更高吞吐量的关键。

LinkedBlockingQueue中提供了head和last变量,分别用于表示队列的头节点和尾节点。需要注意的是在进行LinkedBlockingQueue构造的时候,会提前构造head和last节点,来当作哨兵节点,并且其item为null。其不变性条件为

head.item == null;
tail.next = null;

看一下构造函数。第一个版本的构造函数接收一个整型的变量。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

在另一个版本不接受capacity参数的无参构造函数中,会默认设置capacity为Integer.MAX_VALUE。

下面看两个内部的辅助函数,signalNotEmpty和signalNotFull。

signalNotEmpty函数,用作唤醒在notEmpty条件队列上等待的线程。一般情况下,出队的时候,如果当前队列已空,则线程会在notEmpty条件队列上等待队列非空状态。为了防止出队操作的影响,在这个函数中使用takeLock来进行保护是必须的。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

同样的道理,signalNotFull函数,用作唤醒在notFull条件队列上等待的线程。为了防止其他入队操作的影响,在这个函数中使用putLock来进行保护也是必须的。

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

然后接下来看一下入队辅助函数

private void enqueue(Node<E> node) {
    last = last.next = node;
    // last.next = node;
    // last = last.next;
}

注意以上函数的使用前提是当前线程拥有putLock。

出队辅助函数是

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

前面已经说过,在LinkedBlockingQueue中,head节点是一个哨兵,其不变性是head.item == null,哨兵的下一个节点才是真正的第一个节点。因此,在获取到head.next之后,保存其item的值,然后将其next置为null,并将head指向head.next即完成了入队操作。

下面来看核心的put、offer、add、poll、take等操作。在分析ArrayBlockingQueue的时候,曾经总结过put、add、offer函数的区别。现在回想一下,在ArrayBlockingQueue中,add操作使用无参的offer函数来实现,在入队成功的时候,返回true;入队失败,抛出异常。无参的offer函数在入队成功的时候,返回true;入队失败的时候,返回false。put函数无返回值,在队列已满的情况下,会进行等待。而有参数的offer函数,则队列已满的情况下,等待有最大时间,超过最大时间而无法入队的话,则返回false。那么在LinkedBlockingQueue中,仍是如此吗?

首先来看一下put函数。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInteruptibly();
    try {
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity) {
            notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0) {
        signalNotEmpty();
    }
}

可以看出,此方法的实现和ArrayBlockingQueue中的put方法基本一致。需要思考的是,在signalNotEmpty的时候为什么提前判断了c的值呢?我认为还是基于性能的考虑,在不判断的前提下,直接调用signalNotEmpty绝对算不得错。

无参的offer函数。上面说到,无参的offer函数应该在队列已满的情况下,应该直接返回false。

public boolean offer(E e) {
    final AtomicInteger count = this.count;
    if (count.get() == capacity) return false;
    ....
}

接收时间参数的offer函数,则提供了带等待时间的入队操作。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    while (count.get() == capacity) {
        if (nanos <= 0) return false;
        nanos = notFull.awaitNanos(nanos);
        ....
    }
}

可以看出,在语义方法,LinkedBlockingQueue和ArrayBlockingQueue是完全一致的。

与之对应的,take函数和poll函数不再一一说明。

Comments
Write a Comment