认识BlockingQueue
BlockingQueue是一种可以阻塞线程的队列,java中对这种队列提供了方法抽象,BlockingQueue则是抽象的接口。
-
add:添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常
-
offer:添加元素到队列里,添加成功返回true,添加失败返回false
-
put:添加元素到队列里,如果容量满了会阻塞直到容量不满
-
poll:删除队列头部元素,如果队列为空,返回null。否则返回元素。
-
remove:基于对象找到对应的元素,并删除。删除成功返回true,否则返回false
-
take:删除队列头部元素,如果队列为空,一直阻塞到队列有元素并删除
参考文章
认识一下ArrayBlockingQueue和LinkedBlockingQueue
这两个类是数组与链表的实现,这两个各有特点:
ArrayBlockingQueue
- 初始化时要指定长度
- 单个锁控制,读与写共用一个锁
- 基于array定位查找快
LinkedBlockingQueue
- 初始化时默认为Integer.MAX_VALUE
- 使用分离锁,写入时用的putLock,读取时用的takeLock
- 生产消息的时候需要转换为Node,有性能损耗
阻塞的原理
写时的阻塞
因为写入时阻塞主要是put方法,所以可以通过两个实现类的put方法来看一下是如何实现。
- ArrayBlockingQueue
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
可以看到这里会获取到一个锁,然后在在入队之前会有一个while,条件是count==item.length,其中count是指的当前队列已经写入的数据项个数,item是用于存数据的一个数组。也就是说如果当前队列的数据项等于数组的长度了,说明已经满了,此时则调用noteFull.await()阻塞当前线程;
- LinkedBlockingQueue
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
LinkedBlockingQueue在实现put时确实麻烦一些,只不过阻塞的模式是一样的,都是通过判断容易是否已经写满。
读时的阻塞
读时的配对方法是take,这个方法会对读取进行阻塞。
- LinkedBlockingQueue
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
可以看到原理一样,只不过这里用的是notEmpty这个条件对象,意思表示空的时候等待。
- LinkedBlockingQueue
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
同理LinkedBlockingQueue的实现也是一样的。