本文共 3930 字,大约阅读时间需要 13 分钟。
简述:在jdk concurrent包下面提供了一些同步阻塞队列,这些队列在使用线程池的时候,会把超过核心线程数的任务放在阻塞队列当中,这篇文章主要是来看下ArrayBlockingQueue的源码。
ArrayBlockingQueue是一个线程安全的有界队列,它的主要实现方式是Object[]数组,ReentrantLock,及ReenrantLock中的Condition ,如果对ReentrantLock不熟悉的可以看下之前的博客
从继承体系上来看,满足队列的基本要求,入队列,出队列,属于集合,可以使用Iterater进行遍历,
BlockingQueue除了继承自父接口的一些方法,也提供了对于阻塞队列的一些方法
比如超时的offer, poll等自己的一些方法
下面我们来看ArrayBlockingQueue的基本属性
public class ArrayBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable {final Object[] items; //用来存储元素的数组// 下一次执行task,poll,peek或者remove的索引 int takeIndex;// 下一次执行put,offer,或者add的索引 int putIndex;//当前数组中存放的元素数量 int count;// 提供的lock锁 final ReentrantLock lock;// 队列空了,就挂起,等到有元素放进去后唤醒 private final Condition notEmpty;// 队列满了就挂起,等到取出一个元素后唤醒 private final Condition notFull;}
关于队列的初始化方法,构造方法
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
初始化数据长度,设定好是否是公平锁,默认是false,非公平锁
public ArrayBlockingQueue(int capacity, boolean fair, Collection c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }
这个构造方法在原来的基础上,又增加了一个集合,会将集合当中的元素保存在数组当中,同时修改putIndex的值和count的值
在获取锁这个地方给出了一个注释,锁在这里是保证了可见性,并不是互斥访问,这里加锁是为了让线程将数据写会主存,保证数据的可见性,比如线程A初始化了队列,线程B对队列添加了元素,如果没有加锁,线程A和线程B都没有将数据写回主存,就会造成多线程下,数据错误问题。
关于添加元素操作,有以下几个
add(E e) 队列没有满时,添加元素,满了的话,抛出异常
offer(E e) 没有满添加元素,返回true, 满了的话,返回false
offer(E e, time,unit) 在指定的时间内,插入元素,插入成功返回true, 超过指定时间没有插入元素返回false
put(E e) 如果队列已经满了,一直等待,直到有空余再往里面添加元素
来看下put方法的源码实现
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); //等待被唤醒 enqueue(e); } finally { lock.unlock(); } }
private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) //判断如果队列已经满了,将putIndex值设为0 putIndex = 0; count++; notEmpty.signal(); //唤醒阻塞在take上的线程 }
接下来是关于队列中元素的移除
remove(Object ) 移除队列当中指定元素,如果找到需要移除的元素返回true,否则返回false
poll() 如果当前没有元素返回null,如果有元素直接取出
poll(time ,unit) 如果没有元素,在等待时间内如果队列中放入元素,取出队列中元素,超过等待时间返回
task() 如果队列中没有元素,一直等待,直到队列中有元素,并且取出队列当中的元素
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //等待队列中放入元素 return dequeue(); } finally { lock.unlock(); } }
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
以上就是关于ArrayBlockingQueue的分析,如果掌握了ReentrantLock的知识对ArrayBlockingQueue理解起来是比较容易的
如有错误,欢迎指正~