【JDK源码分析系列】AbstractQueuedSynchronizer 源码分析 -- 条件队列的重要方法
发布日期:2021-05-07 20:51:21 浏览次数:24 分类:原创文章

本文共 9092 字,大约阅读时间需要 30 分钟。

【JDK源码分析系列】AbstractQueuedSynchronizer 源码分析 -- 条件队列的重要方法

【1】AbstractQueuedSynchronizer 整体架构

1. AQS 中队列只有两个 : 同步队列 + 条件队列,底层数据结构两者都是链表;
2. 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序

【2】条件队列的必要性

主要是因为并不是所有场景一个同步队列就可以搞定的,在遇到锁 + 队列结合的场景时,就需要 Lock + Condition 配合才行,先使用 Lock 来决定哪些线程可以获得锁,哪些线程需要到同步队列里面排队阻塞;获得锁的多个线程在碰到队列满或者空的时候,可以使用 Condition 来管理这些线程,让这些线程阻塞等待,然后在合适的时机后,被正常唤醒,同步队列 + 条件队列联手使用的场景,最多被使用到锁 + 队列的场景中(整体架构途中深绿色和浅蓝色箭头);

【3】条件队列图示解析

【3.1】条件队列的基本结构

并发包中的 Lock (同步器) 可以拥有一个同步队列和多个等待队列

【3.2】等待

【3.3】通知

当前线程加入等待队列(条件队列)

节点从等待队列(条件队列)移动到同步队列

【4】条件队列相关源码分析

【4.1】入队列等待

获得锁的线程,如果在碰到队列满或空的时候,就会阻塞住,这个阻塞就是用条件队列实现的,这个动作我们叫做入条件队列,方法名称为 await,流程见整体架构图中深绿色箭头流向

【4.1.1】AbstractQueuedSynchronizer -- public final void await() throws InterruptedException

// 线程入条件队列public final void await() throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    // 加入到条件队列的队尾    Node node = addConditionWaiter();    // 加入条件队列后,会释放lock申请的资源,唤醒同步队列队列头的节点    // 自己马上就要阻塞了,必须马上释放之前lock的资源,不然自己不被唤醒,别的线程永远得不到该共享资源了    int savedState = fullyRelease(node);    int interruptMode = 0;    // 确认node不在同步队列上,再阻塞,如果 node 在同步队列上,是不能够上锁的    // 这里的情况比较特殊,目前想到的只有一种可能:    // node刚被加入到条件队列中,立马就被其他线程signal转移到同步队列中去了    while (!isOnSyncQueue(node)) {        // this = AbstractQueuedSynchronizer$ConditionObject        // 阻塞在条件队列上        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    // 其他线程通过 signal 已经把 node 从条件队列中转移到同步队列中的数据结构中去了    // 所以这里节点苏醒了,直接尝试 acquireQueued    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    if (node.nextWaiter != null) // clean up if cancelled        // 如果状态不是CONDITION,就会自动删除        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}

【4.1.2】AbstractQueuedSynchronizer -- private Node addConditionWaiter()

// 增加新的 waiter 到队列中,返回新添加的 waiter// 如果尾节点状态不是 CONDITION 状态,删除条件队列中所有状态不是 CONDITION 的节点// 如果队列为空,新增节点作为队列头节点,否则追加到尾节点上private Node addConditionWaiter() {    Node t = lastWaiter;    // If lastWaiter is cancelled, clean out.    // 如果尾部的 waiter 不是 CONDITION 状态了,删除    if (t != null && t.waitStatus != Node.CONDITION) {        unlinkCancelledWaiters();        t = lastWaiter;    }    // 新建条件队列 node    Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 队列是空的,直接放到队列头    if (t == null)        firstWaiter = node;    // 队列不为空,直接到队列尾部    else        t.nextWaiter = node;    lastWaiter = node;    return node;}

【4.1.2】AbstractQueuedSynchronizer -- private void unlinkCancelledWaiters()

// 会检查尾部的 waiter 是不是已经不是CONDITION状态了// 如果不是,删除所有的waiterprivate void unlinkCancelledWaiters() {    Node t = firstWaiter;    // trail 可以把状态都是 CONDITION 的 node 串联起来,即使 node 之间有其他节点都可以    Node trail = null;    while (t != null) {        Node next = t.nextWaiter;        // 当前node的状态不是CONDITION,删除自己        if (t.waitStatus != Node.CONDITION) {            //删除当前node            t.nextWaiter = null;            // 如果 trail 是空的,咱们循环又是从头开始的,说明从头到当前节点的状态都不是 CONDITION 的节点            // 都已经被删除了,所以移动队列头结点到当前节点的下一个节点            if (trail == null)                firstWaiter = next;            // 如果找到上次状态是CONDITION的节点的话,先把当前节点删掉,然后把自己挂到上一个状态是 CONDITION 的节点上            else                trail.nextWaiter = next;            // 遍历结束,最后一次找到的CONDITION节点就是尾节点            if (next == null)                lastWaiter = trail;        }        // 状态是 CONDITION 的 Node        else            trail = t;        // 继续循环,循环顺序从头到尾        t = next;    }}

【4.2】 单个唤醒

signal 方法是唤醒的意思,比如之前队列满了,有了一些线程因为 take 操作而被阻塞进条件队列中,突然队列中的元素被线程 A 消费了,线程 A 就会调用 signal 方法,唤醒之前阻塞的线程,会从条件队列的头节点开始唤醒 (流程见整体架构图中浅蓝色部分)。

【4.2.1】AbstractQueuedSynchronizer -- public final void signal()

// 唤醒阻塞在条件队列中的节点public final void signal() {    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 从头节点开始唤醒    Node first = firstWaiter;    if (first != null)        // doSignal 方法会把条件队列中的节点转移到同步队列中去        doSignal(first);}

【4.2.2】AbstractQueuedSynchronizer -- private void doSignal(Node first)

// 把等待队列头节点转移到同步队列去private void doSignal(Node first) {    do {        // nextWaiter为空,说明到队尾了        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        // 从队列头部开始唤醒,所以直接把头结点.next 置为 null,        // 这种操作其实就是把 node 从条件队列中移除了        // 这里有个重要的点是,每次唤醒都是从队列头部开始唤醒,        // 所以把 next 置为 null 没有关系,如果唤醒是从任意节点        // 开始唤醒的话,就会有问题,容易造成链表的割裂        first.nextWaiter = null;        // 通过while保证transferForSignal能成功        // (first = firstWaiter) != null  = true 的话,        // 表示还可以继续循环, = false 说明队列中的元素已经循环完了    } while (!transferForSignal(first) &&                (first = firstWaiter) != null);}

【4.2.3】AbstractQueuedSynchronizer -- final boolean transferForSignal(Node node)

// 返回 true 表示转移成功, false 失败// 大概思路:// 1. node 追加到同步队列的队尾// 2. 将 node 的前一个节点状态置为 SIGNAL,成功直接返回,失败直接唤醒// 可以看出来 node 的状态其实仍然是 CONDITIONfinal boolean transferForSignal(Node node) {    /*        * If cannot change waitStatus, the node has been cancelled.        */    // 将 node 的状态从 CONDITION 修改成初始化,失败返回 false    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;    /*     * Splice onto queue and try to set waitStatus of predecessor(前任) to     * indicate(表明) that thread is (probably) waiting. If cancelled or     * attempt to set waitStatus fails, wake up to resync (in which     * case the waitStatus can be transiently and harmlessly wrong).     */    // 当前队列加入到同步队列,返回的 p 是 node 在同步队列中的前一个节点    // 看命名是 p,实际是 pre 前一个单词的缩写    Node p = enq(node);    int ws = p.waitStatus;    // 状态修改成 SIGNAL,如果成功直接返回    // 把当前节点的前一个节点修改成 SIGNAL 的原因,是因为 SIGNAL 本身就表示当前节点后面的节点都是需要被唤醒的    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        // 如果 p 节点被取消,或者状态不能修改成SIGNAL,直接唤醒        LockSupport.unpark(node.thread);    return true;}

 

【4.3】全部唤醒

signalAll 的作用是唤醒条件队列中的全部节点

【4.3.1】AbstractQueuedSynchronizer -- public final void signalAll()

// 唤醒全部public final void signalAll() {    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    // 拿到头节点    Node first = firstWaiter;    if (first != null)        // 从头节点开始唤醒条件队列中所有的节点        doSignalAll(first);}

【4.3.2】AbstractQueuedSynchronizer -- private void doSignalAll(Node first) 

// 把等待队列所有节点依次转移到同步队列去private void doSignalAll(Node first) {    lastWaiter = firstWaiter = null;    do {        // 拿出条件队列队列头节点的下一个节点        Node next = first.nextWaiter;        // 把头节点从同步队列中删除        first.nextWaiter = null;        // 头节点转移到同步队列中去        transferForSignal(first);        // 开始循环头节点的下一个节点        first = next;    } while (first != null);}

【4.4】条件队列相关的共通方法

//判断该 signal 是否发生final boolean transferAfterCancelledWait(Node node) {    // 将 node 的状态从 CONDITION 修改成初始化    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {        //将节点加入同步队列        enq(node);        return true;    }    /*     * If we lost out to a signal(), then we can't proceed     * until it finishes its enq().  Cancelling during an     * incomplete transfer is both rare and transient, so just     * spin.     */    //当节点不在同步队列时放弃线程对 CPU 的占用    while (!isOnSyncQueue(node))        Thread.yield();    return false;}//释放锁并设置节点的状态为 Node.CANCELLEDfinal int fullyRelease(Node node) {    boolean failed = true;    try {        int savedState = getState();        if (release(savedState)) {            failed = false;            return savedState;        } else {            throw new IllegalMonitorStateException();        }    } finally {        if (failed)            node.waitStatus = Node.CANCELLED;    }}//判断节点是否在同步队列中final boolean isOnSyncQueue(Node node) {    //node.prev == null : 条件队列是单向链表    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    if (node.next != null) // If has successor, it must be on queue        return true;    /*        * node.prev can be non-null, but not yet on queue because        * the CAS to place it on queue can fail. So we have to        * traverse from tail to make sure it actually made it.  It        * will always be near the tail in calls to this method, and        * unless the CAS failed (which is unlikely), it will be        * there, so we hardly ever traverse much.        */    return findNodeFromTail(node);}//从尾部到头部遍历条件队列确定节点在条件队列中private boolean findNodeFromTail(Node node) {    Node t = tail;    for (;;) {        if (t == node)            return true;        if (t == null)            return false;        t = t.prev;    }}//检查中断//返回//THROW_IE : 中断发生在 signal 之前//REINTERRUPT : 中断发生在 signal 之后//0 : 没有中断private int checkInterruptWhileWaiting(Node node) {    return Thread.interrupted() ?            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :            0;}private void reportInterruptAfterWait(int interruptMode)    throws InterruptedException {    if (interruptMode == THROW_IE)        throw new InterruptedException();    else if (interruptMode == REINTERRUPT)        selfInterrupt();}//由子类实现//true 已经加锁protected boolean isHeldExclusively() {    throw new UnsupportedOperationException();}

参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。

【1】Java并发编程的艺术

【2】

【3】

上一篇:【JDK源码分析系列】FutureTask 源码解析
下一篇:【JDK源码分析系列】AbstractQueuedSynchronizer 源码分析 -- 基本属性

发表评论

最新留言

路过按个爪印,很不错,赞一个!
[***.219.124.196]2025年04月12日 08时19分15秒