【JDK源码分析系列】ArrayBlockingQueue源码分析
发布日期:2021-05-07 20:52:21 浏览次数:9 分类:技术文章

本文共 9016 字,大约阅读时间需要 30 分钟。

【JDK源码分析系列】ArrayBlockingQueue源码分析

【1】ArrayBlockingQueue 继承体系图示

【2】ArrayBlockingQueue 源码分析

【2.1】ArrayBlockingQueue 主要属性

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 1. 利用数组存储元素 // 2. 通过放指针和取指针来标记下一次操作的位置 // 3. 利用重入锁来保证并发安全 // // 队列存放在 Object 的数组里面 // 数组大小必须在初始化的时候手动设置,没有默认大小 final Object[] items; // 下次拿数据的时候的索引位置 int takeIndex; // 下次放数据的索引位置 int putIndex; // 当前已有元素的数量 int count; // 可重入的锁,保证并发访问 final ReentrantLock lock; // 非空条件 private final Condition notEmpty; // 非满条件 private final Condition notFull;}

【2.2】ArrayBlockingQueue 构造方法

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 1. 有界的阻塞数组,容量一旦创建,后续大小无法修改 // 2. 元素是有顺序的,按照先入先出进行排序,从队尾插入数据数据,从队头拿数据 // 3. 队列满时,往队列中 put 数据会被阻塞,队列空时,往队列中拿数据也会被阻塞 // // 公平和非公平指的是读写锁的,比如说现在队列是满的,还有很多线程执行 // put 操作,必然会有很多线程等待,在队列不满时,会唤醒等待的线程 // fair 如果是 true 话,就会按照线程等待的排队顺序唤醒线程 // 如果是 false 的话,就会随机唤醒线程 // 通过利用锁的公平和非公平,来实现了 put 和 take 阻塞被唤醒时的公平和非公平 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组,初始化时必须传入容量即数组的大小 this.items = new Object[capacity]; // 创建重入锁及两个条件 lock = new ReentrantLock(fair); // 队列不为空 Condition,在 put 成功时使用 notEmpty = lock.newCondition(); // 队列不满 Condition,在 take 成功时使用 notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection
c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { // i 代表插入的位置 int i = 0; try { // 注意,如果 c 的大小超过了数组的大小会抛异常 for (E e : c) { // 元素不能为null checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } // 更新已有数据数量 count = i; // 如果插入的位置,正好是队尾了,下次需要从 0 开始插入 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }}

【2.3】ArrayBlockingQueue 添加元素

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { public boolean add(E e) { // 调用父类AbstractQueue的add方法 // 在父类AbstractQueue的add方法中实际调用了offer方法 // offer方法由子类覆写 return super.add(e); } // 新增 public boolean offer(E e) { // 判断元素是否为null checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { // 如果队列是满的,直接返回false if (count == items.length) return false; else { // 新增 // 如果数组没满就调用入队方法并返回true enqueue(e); return true; } } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 判断元素是否为null checkNotNull(e); // 获取timeout值 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果数组满了,就阻塞nanos纳秒 // 如果唤醒该线程时依然没有空间且时间到了就返回false while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } // 元素入队 enqueue(e); return true; } finally { lock.unlock(); } } // 新增,如果队列满,无限阻塞 public void put(E e) throws InterruptedException { // 元素不能为空 checkNotNull(e); final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 队列如果是满的,就无限等待 // 一直等待队列中有数据被拿走时才会被唤醒 while (count == items.length) notFull.await(); // 元素入队 enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // 获取元素数组 final Object[] items = this.items; // putIndex 为本次插入的位置 items[putIndex] = x; // ++ putIndex 计算下次插入的位置 // 如果下次插入的位置,正好等于队尾,下次插入就从 0 开始 if (++putIndex == items.length) putIndex = 0; // 队列中已有元素数量增1 count++; // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空 notEmpty.signal(); }}

【2.4】ArrayBlockingQueue 获取元素

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列为空,无限等待 // 直到队列中有数据被 put 后被唤醒 // 如果队列无元素,则阻塞等待在条件notEmpty上 while (count == 0) notEmpty.await(); // 从队列中拿数据 return dequeue(); } finally { lock.unlock(); } } public E poll() { final ReentrantLock lock = this.lock; // 加锁 lock.lock(); try { // 如果队列没有元素则返回null,否则出队 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 获取timeout long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 加锁,如果线程中断了抛出异常 lock.lockInterruptibly(); try { // 如果队列无元素,则阻塞等待nanos纳秒 // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } // 做出队处理 return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // 获取数组 final Object[] items = this.items; // 获取取索引处的元素 // takeIndex 代表本次拿数据的位置,是上一次拿数据时计算好的 E x = (E) items[takeIndex]; // 把取指针位置设为null // 帮助 gc items[takeIndex] = null; // ++ takeIndex 计算下次拿数据的位置 // 如果正好等于队尾的话,下次就从 0 开始拿数据 if (++takeIndex == items.length) takeIndex = 0; // 队列实际大小减 1 count--; if (itrs != null) // 与迭代器相关的操作 itrs.elementDequeued(); // 唤醒notFull条件 // 唤醒被队列满阻塞的线程 notFull.signal(); return x; }}

【2.5】ArrayBlockingQueue 删除元素

情况一,takeIndex == removeIndex

情况二, takeIndex != removeIndex

1). removeIndex + 1 != putIndex

2). removeIndex + 1 == putIndex

public class ArrayBlockingQueue
extends AbstractQueue
implements BlockingQueue
, java.io.Serializable { // 情况1. 删除位置和 takeIndex 一样 // 比如 takeIndex 是 2,而要删除的位置正好也是 2,则将位置 2 的数据置为 null ,并重新计算 takeIndex 为 3 // // 情况2. 删除位置和 takeIndex 不一样 // 找到要删除元素的下一个并比较其和 putIndex 的关系 // 如果下一个元素不是 putIndex,就把下一个元素往前移动一位 // 如果下一个元素是 putIndex,把 putIndex 的值修改成删除的位置 void removeAt(final int removeIndex) { // 获取数组 final Object[] items = this.items; // 情况1,如果删除位置正好等于下次要拿数据的位置 // 即removeIndex == takeIndex if (removeIndex == takeIndex) { // 下次要拿数据的位置直接置空 items[takeIndex] = null; // 要拿数据的位置往后移动一位 if (++takeIndex == items.length) takeIndex = 0; count--; // 处理迭代器相关的逻辑 if (itrs != null) itrs.elementDequeued(); // 情况 2,如果删除位置不是下次要拿数据的位置 // 即removeIndex != takeIndex } else { // 获取下次放入数据的位置 final int putIndex = this.putIndex; for (int i = removeIndex;;) { // 找到要删除元素的下一个 int next = i + 1; if (next == items.length) next = 0; // 下一个元素不是 putIndex if (next != putIndex) { // 下一个元素往前移动一位 items[i] = items[next]; i = next; // 下一个元素是 putIndex } else { // 删除元素 items[i] = null; // 下次放元素时,应该从本次删除索引处放置 this.putIndex = i; break; } } // 数组中已有元素数量减一 count--; // 迭代器相关的处理 if (itrs != null) itrs.removedAt(removeIndex); } // 唤醒notFull条件 // 唤醒被队列满阻塞的线程 notFull.signal(); }}

致谢

本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。

【1】面试官系统精讲Java源码及大厂真题

【2】

上一篇:【JDK源码分析系列】PriorityQueue源码分析
下一篇:【JAVA 网络编程系列】Netty -- 线程池

发表评论

最新留言

关注你微信了!
[***.104.42.241]2025年03月19日 00时49分04秒