引言
JDK中除了上文提到的各种并发容器,还提供了丰富的阻塞队列。阻塞队列统一实现了BlockingQueue
//插入元素,队列满后会抛出异常 boolean add(E e); //移除元素,队列为空时会抛出异常 E remove(); //插入元素,成功反会true boolean offer(E e); //移除元素 E poll(); //插入元素,队列满后会阻塞 void put(E e) throws InterruptedException; //移除元素,队列空后会阻塞 E take() throws InterruptedException; //限时插入 boolean offer(E e, long timeout, TimeUnit unit) //限时移除 E poll(long timeout, TimeUnit unit); //获取所有元素到Collection中 int drainTo(Collection<? super E> c);JDK1.8中的阻塞队列实现共有7个,分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque,下面就来一一对他们进行一个简单的分析。
ArrayBlockingQueue
ArrayBlockingQueue是一个底层用数组实现的有界阻塞队列,有界是指他的容量大小是固定的,不能扩充容量,在初始化时就必须确定队列大小。它通过可重入的独占锁ReentrantLock来控制并发,Condition来实现阻塞。
//通过数组来存储队列中的元素 final Object[] items; //初始化一个固定的数组大小,默认使用非公平锁来控制并发 public ArrayBlockingQueue(int capacity) { this(capacity, false); } //初始化固定的items数组大小,初始化notEmpty以及notFull两个Condition来控制生产消费 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair);//通过ReentrantLock来控制并发 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }可以看到ArrayBlockingQueue初始化了一个ReentrantLock以及两个Condition,用来控制并发下队列的生产消费。这里重点看下阻塞的put以及take方法:
//插入元素到队列中 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //获取独占锁 try { while (count == items.length) //如果队列已满则通过await阻塞put方法 notFull.await(); enqueue(e); //插入元素 } finally { lock.unlock(); } } private void enqueue(
