ArrayBlockingQueue源码分析

今天来看一下ArrayBlockingQueue的源码。从名字上看,能够推测,它使用Array来存储数据,实现了Queue这种数据结构,同时它提供了阻塞的入队和出队操作。事实也是如此。

它的签名如下

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable

它内部是用了数组来进行数据的存储

final Object[] items;

同时,定义了takeIndex和putIndex变量,分别用来表示队列出队元素的序号和队列入队元素的序号。count为队列中元素的个数。

int takeIndex;
int putIndex;
int count;

lock为队列中所使用的锁,而notEmpty和notFull为两个条件队列,分别表示了队列非空和队列非满两个条件。

final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

ArrayBlockingQueue提供了三个版本的构造函数。

首先第一个,接受一个整型参数和一个boolean类型的参数。

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) {
        throw new IllegalArgumentException();
    }
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

从以上实现中,可以看出capacity代表了内部数组的容量,而fair则用于标识重入锁是公平锁还是非公平锁。

第二个版本,只接受一个整型参数。

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

从上面的实现可以看出,默认情况下,ArrayBlockingQueue的实现使用的是非公平锁。

第三个版本,稍微复杂一些,除了接收整型参数、boolean型参数之外,第三个参数是一个Collection对象。

在构造ArrayBlockingQueue的同时,将Collection对象中的数组添加到队列中。

public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
    this(capacity, fair);
    
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

从以上实现中,可以看出,ArrayBlockingQueue中不允许出现null数据。在传入参数的时候,capacity必须大于Collection对象的长度,否则将会触发ArrayIndexOutBoundsException异常,捕获后重新构造一个IllegalArgumentException来进行抛出。

inc和dec为两个辅助函数。分别实现了序号的递增和递减。在序号递增到数组长度-1的时候,设置为0重新从0开始,在序号递减到0的时候,将序号重新设置为数组长度-1,从新开始。

final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

final int dec(int i) {
    return (( i == 0) ? items.length : i) - 1;
}

代码中,提供了一个辅助函数insert的实现。

private void insert(E x) {
    items[putIndex] = x;
    putIndex = inc(putIndex);
    ++count;
    notEmpty.signal();
}

上面有说过,putIndex为下一个入队元素的序号。在设置putIndex新序号的时候,使用了inc函数。在数组当前已满的情况下,将putIndex设置为0,下一次的入队操作将覆盖以前插入的元素。最后,不要忘记调用notEmpty.signal(),来唤醒在notEmpty等待队列上等待的线程。

辅助函数extract和insert函数是对应的,用于元素出队列。

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    takeIndex = inc(takeIndex);
    --count;
    notFull.signal();
    return x;
}

看一下offer入队操作

public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length) {
            return false;
        } else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

以上代码可以看出,在队列元素已满的情况下,offer入队操作并没有在notFull条件队列上进行等待,而是简单的 返回false。

add操作使用了offer操作来进行实现

public boolean add(E e) {
    if (offer(e)) {
        return true;
    } else {
        throw new IllegalStateException("Queue full");
    }
}

再来看一下put操作。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            notFull.await();
        }
        insert(e);
    } finally {
        lock.unlock();
    }
}

上面说到offer操作,在队列已满的情况下,并不会在notFull条件队列上等待,而是直接返回false。而提供了如此操作的,就是put。

offer操作除了上面的版本,还有一个支持超时的版本。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    checkNotNull(e);
    long nanos = unit.ToNanos(timeout);
    final ReentrantLock lock  = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0) {
                return false;
            }
            nanos = notFull.awaitNanos(nanos);
        }
        insert(e);
        return true;
    } finally {
        lock.unlock();
    }
}

要理解上面的代码,首先需要了解

long awaitNanos(long nanosTimeout) throws InteruptedException;

在awaitNanos函数中,参数nanosTimeout为等待的最长时间,而返回值表示等待时间减去函数调用到返回所花费的时间。

理解了这点之后,再去看offer函数。使用nanos来进行计时。如果在notFull条件队列上的等待时间已经超过了最长等待时间,则不再继续等待,而是返回false。调用这个版本的offer函数,可以避免线程长时间的等待。

这里可以对put、offer、add三个操作做一下对比。

  1. 三个操作都是入队操作。
  2. 从返回值方面来讲,add操作,入队成功返回true,队列已满入队失败会抛出异常。不接收时间参数的offer操作,入队成功返回true,队列已满入队失败会返回false,put无返回值。在队列已满无法入队的时候,会在notFull条件队列上等待。接收时间参数的offer操作和put操作比较类似,不同在于会有一个等待的最大时间,如果在这个等待时间之内还无法完成入队操作,则将返回false,入队成功的话,返回true。

看完了put、offer操作,与之对应的take、poll操作就比较直观了。在语义上,正好一一对应。但是,add操作与remove操作并不能一一对应。因为add操作,可以对应着入队操作,但remove操作对应的不是出队操作,更恰当的描述是从List中删除某个元素的操作。

下面来看一下clear函数。

public void clear() {
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        for (int i = takeIndex, k = count; k > 0; i = inc(i)) {
            items[i] = null;
        }
        count = 0;
        putIndex = 0;
        takeIndex = 0;
        notFull.signalAll();
    } finally {
        lock.unlock();
    }
}

在以上的操作中,将队列清空之后调用了notFull.signalAll()来唤醒所有在这个条件队列上等待的线程,而在上面的insert和extract中调用的是signal方法。这是因为,在出队和入队这两个单个元素的操作上,只要唤醒一个线程来执行相关操作即可。唤醒的线程的操作执行完成后,可能这个条件队列的条件就重新变得不满足。为了避免唤醒线程之后无法执行操作,而又重新进入等待状态,因此调用了signal方法。而在clear方法之中,调用了signalAll方法,可以唤醒所有等待的线程。在某个线程执行完操作之后,条件队列的条件可能仍旧满足,因此其他的线程仍然能部执行操作。

这种方式技巧,在drainTo函数中同样得到了应用。

Comments
Write a Comment