AQS
发布日期:2021-05-06 17:45:29 浏览次数:55 分类:原创文章

本文共 22949 字,大约阅读时间需要 76 分钟。

AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现。尽管队列是FIFO,但是入队本身不保证公平性,需要实现锁的自行实现。

0 前提

AQS的子类如果用到更改此状态的方法,必须覆盖定义受保护方法,tryAcquire、tryAcquireShared或tryRelease、tryReleaseShared(不是全部,用到排他覆盖排他,用到共享定义共享覆盖共享)。

1 功能

1.1 操作资源的方法

AQS通过暴露以下API来让让用户自己解决上面提到的“如何定义资源是否可以被访问”的问题:

tryAcquire 排它获取(资源数)
tryRelease 排它释放(资源数)
tryAcquireShared 共享获取(资源数)
tryReleaseShared 共享获取(资源数)
isHeldExclusively 是否排它状态

1.2 支持中断、超时

还记得Lock接口中的那些锁中断、限时等待、锁尝试的方法吗?这些方法的实现其实AQS都内置提供了。
使用了AQS框架的同步器,都支持下面的操作:

阻塞和非阻塞(例如tryLock)同步;
可选的超时设置,让调用者可以放弃等待;
可中断的阻塞操作。

1.3 支持独占模式和共享模式

1.4 支持Condition条件等待

2 组成

围绕着资源,衍生出三个基本问题:
1 同步状态(synchronization state)的管理
2 阻塞/唤醒线程的操作
3 线程等待队列的管理

2.1 同步状态State

同步状态,其实就是资源。AQS使用单个int(32位)来保存同步状态,并暴露出getState、setState以及compareAndSetState操作来读取和更新这个状态。

AbstractQueuedLongSynchronizer与AbstractQueuedSynchronize的区别就是State是64位的long类型。

同步器 资源的定义
ReentrantLock 资源表示独占锁。State为0表示锁可用;为1表示被占用;为N表示重入的次数
CountDownLatch 资源表示倒数计数器。State为0表示计数器归零,所有线程都可以访问资源;为N表示计数器未归零,所有线程都需要阻塞。
Semaphore 资源表示信号量或者令牌。State≤0表示没有令牌可用,所有线程都需要阻塞;大于0表示由令牌可用,线程每获取一个令牌,State减1,线程没释放一个令牌,State加1。
ReentrantReadWriteLock 资源表示共享的读锁和独占的写锁。state逻辑上被分成两个16位的unsigned short,分别记录读锁被多少线程使用和写锁被重入的次数。

2.2 阻塞与唤醒

LockSupport类来作为线程阻塞和唤醒的工具。底层是native方法,

2.3 静态内部类Node

在AQS有一个静态内部类Node,其中有这样一些属性:

结点一共有两种类型:独占(EXCLUSIVE)和共享(SHARED)。

static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus //节点状态
volatile Node prev //当前节点/线程的前驱节点
volatile Node next; //当前节点/线程的后继节点
volatile Thread thread;//加入同步队列的线程引用
Node nextWaiter;//等待队列中的下一个节点

节点的waitStatus 状态有以下这些:

int CANCELLED = 1//节点从同步队列中取消
int SIGNAL = -1//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行;
int CONDITION = -2//当前节点进入等待队列中
int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
int INITIAL = 0;//初始状态

2.4 同步队列

Craig,Landin和Hagersten锁(CLH锁)的一种变种,采用双向链表实现,因此也叫CLH队列。

    private transient volatile Node head;    private transient volatile Node tail;    private volatile int state;

独占式锁:

void acquire(int arg):独占式获取同步状态,如果获取失败则插入同步队列进行等待; void acquireInterruptibly(int arg):与acquire方法相同,但在同步队列中进行等待的时候可以检测中断; boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly基础上增加了超时等待功能, 在超时时间内没有获得同步状态返回false; boolean release(int arg):释放同步状态,该方法会唤醒在同步队列中的下一个节点

共享式锁:

void acquireShared(int arg):共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;void acquireSharedInterruptibly(int arg):在acquireShared方法基础上增加了能响应中断的功能;boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基础上增加了超时等待的功能;boolean releaseShared(int arg):共享式释放同步状态

也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下

img

通过对源码的理解以及做实验的方式,现在我们可以清楚的知道这样几点:

  1. 节点的数据结构,即AQS的静态内部类Node,节点的等待状态等信息
  2. 同步队列是一个双向队列,AQS通过持有头尾指针管理同步队列

那么,节点如何进行入队和出队是怎样做的了?实际上这对应着锁的获取和释放两个操作:获取锁失败进行入队操作,获取锁成功进行出队操作。

2.4.1 独占锁的获取(acquire方法)

我们继续通过看源码和debug的方式来看,还是以上面的demo为例,调用lock()方法是获取独占式锁,获取失败就将当前线程加入同步队列,成功则线程执行。而lock()方法实际上会调用AQS的**acquire()**方法,源码如下

public final void acquire(int arg) {           //先看同步状态是否获取成功,如果成功则方法结束返回        //若失败则先调用addWaiter()方法再调用acquireQueued()方法        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();}

关键信息请看注释,acquire根据当前获得同步状态成功与否做了两件事情:1. 成功,则方法结束返回,2. 失败,则先调用addWaiter()然后在调用acquireQueued()方法。

获取同步状态失败,入队操作

当线程获取独占式锁失败后就会将当前线程加入同步队列,那么加入队列的方式是怎样的了?我们接下来就应该去研究一下addWaiter()和acquireQueued()。addWaiter()源码如下:

private Node addWaiter(Node mode) {           // 1. 将当前线程构建成Node类型        Node node = new Node(Thread.currentThread(), mode);        // Try the fast path of enq; backup to full enq on failure        // 2. 尝试快速添加尾节点        Node pred = tail;        if (pred != null) {               // 2.2 将当前节点尾插入的方式插入同步队列中            node.prev = pred;            if (compareAndSetTail(pred, node)) {                   pred.next = node;                return node;            }        }        // 2.1. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程        enq(node);        return node;}

分析可以看上面的注释。程序的逻辑主要分为两个部分:**1. 当前同步队列的尾节点为null,调用方法enq()插入;2. 当前队列的尾节点不为null,则采用尾插入(compareAndSetTail()方法)的方式入队。**另外还会有另外一个问题:如果 if (compareAndSetTail(pred, node))为false怎么办?会继续执行到enq()方法,同时很明显compareAndSetTail是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。因此,经过我们这样的分析,enq()方法可能承担两个任务:**1. 处理当前同步队列尾节点为null时进行入队操作;2. 如果CAS尾插入节点失败后负责自旋进行尝试。**那么是不是真的就像我们分析的一样了?只有源码会告诉我们答案:),enq()源码如下:

private Node enq(final Node node) {           for (;;) {               Node t = tail;            if (t == null) {    // Must initialize                //1. 构造头结点                if (compareAndSetHead(new Node()))                    tail = head;            } else {                   // 2. 尾插入,CAS操作失败自旋尝试                node.prev = t;                if (compareAndSetTail(t, node)) {                       t.next = node;                    return t;                }            }        }}

在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在tail为null时,即当前线程是第一次插入同步队列。compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)for死循环中不断尝试,直至成功return返回为止。因此,对enq()方法可以做这样的总结:

  1. 在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列的头结点的初始化
  2. 自旋不断尝试CAS尾插入节点直至成功为止

现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了?那么紧接着会有下一个问题?在同步队列中的节点(线程)会做什么事情了来保证自己能够有机会获得独占式锁了?带着这样的问题我们就来看看acquireQueued()方法,从方法名就可以很清楚,这个方法的作用就是排队获取锁的过程,源码如下:

final boolean acquireQueued(final Node node, int arg) {           boolean failed = true;        try {               boolean interrupted = false;            for (;;) {                   // 1. 获得当前节点的先驱节点                final Node p = node.predecessor();                // 2. 当前节点能否获取独占式锁                                  // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁                if (p == head && tryAcquire(arg)) {                       //队列头指针用指向当前节点                    setHead(node);                    //释放前驱节点                    p.next = null; // help GC                    failed = false;                    return interrupted;                }                // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁                if (shouldParkAfterFailedAcquire(p, node) &&                    parkAndCheckInterrupt())                    interrupted = true;            }        } finally {               if (failed)                cancelAcquire(node);        }}

程序逻辑通过注释已经标出,整体来看这是一个这又是一个自旋的过程(for (;😉),代码首先获取当前节点的先驱节点,如果先驱节点是头结点的并且成功获得同步状态的时候(if (p == head && tryAcquire(arg))),当前节点所指向的线程能够获取锁。反之,获取锁失败进入等待状态。整体示意图为下图:

img

获取锁的节点出队的逻辑是:获取锁成功,前一个节点出队操作。将当前节点通过setHead()方法设置为队列的头结点,**也就是说release方法没有释放节点,而是在acquireQueued节点的setHead()方法。**然后将之前的头结点的next域设置为null并且pre域也为null,即与队列断开,无任何引用方便GC时能够将内存进行回收。示意图如下:

//队列头结点引用指向当前节点setHead(node);//释放前驱节点p.next = null; // help GCfailed = false;return interrupted;

img

那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {       int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {           /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        do {               node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {           /*         * waitStatus must be 0 or PROPAGATE.  Indicate that we         * need a signal, but don't park yet.  Caller will need to         * retry to make sure it cannot acquire before parking.         */        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

shouldParkAfterFailedAcquire()方法主要逻辑是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS将节点状态由INITIAL设置成SIGNAL,表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false,然后会在acquireQueued()方法中for (;;)死循环中会继续重试,直至compareAndSetWaitStatus设置节点状态位为SIGNAL时shouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()方法,该方法的源码为:

private final boolean parkAndCheckInterrupt() {           //使得该线程阻塞        LockSupport.park(this);        return Thread.interrupted();}

该方法的关键是会调用LookSupport.park()方法(关于LookSupport会在以后的文章进行讨论),该方法是用来阻塞当前线程的。因此到这里就应该清楚了,acquireQueued()在自旋过程中主要完成了两件事情:

  1. 如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该方法执行结束退出
  2. 获取锁失败的话,先将节点状态设置成SIGNAL,然后调用LookSupport.park方法使得当前线程阻塞

经过上面的分析,独占式锁的获取过程也就是acquire()方法的执行流程如下图所示:

img

独占式锁获取(acquire()方法)流程图.png

2.4.2 独占锁的释放(release()方法)

独占锁的释放就相对来说比较容易理解了,废话不多说先来看下源码:

public final boolean release(int arg) {           if (tryRelease(arg)) {               Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;}

这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。unparkSuccessor方法源码:

private void unparkSuccessor(Node node) {       /*     * If status is negative (i.e., possibly needing signal) try     * to clear in anticipation of signalling.  It is OK if this     * fails or if status is changed by waiting thread.     */    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    /*     * Thread to unpark is held in successor, which is normally     * just the next node.  But if cancelled or apparently null,     * traverse backwards from tail to find the actual     * non-cancelled successor.     */    //头节点的后继节点    Node s = node.next;    if (s == null || s.waitStatus > 0) {           s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        //后继节点不为null时唤醒该线程        LockSupport.unpark(s.thread);}

源码的关键信息请看注释,首先获取头节点的后继节点,当后继节点的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。

到现在我们终于啃下了一块硬骨头了,通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结:

  1. 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()和enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;
  2. 线程获取锁是一个自旋的过程,当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞
  3. 释放锁的时候会唤醒后继节点;

总体来说:在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。

2.4.3 可中断式获取锁(acquireInterruptibly方法)

我们知道lock相较于synchronized有一些更方便的特性,比如能响应中断以及超时等待等特性,现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly方法,源码为:

public final void acquireInterruptibly(int arg)        throws InterruptedException {       if (Thread.interrupted())        throw new InterruptedException();    if (!tryAcquire(arg))        //线程获取锁失败        doAcquireInterruptibly(arg);}

在获取同步状态失败后就会调用doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg)    throws InterruptedException {       //将节点插入到同步队列中    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {           for (;;) {               final Node p = node.predecessor();            //获取锁出队            if (p == head && tryAcquire(arg)) {                   setHead(node);                p.next = null; // help GC                failed = false;                return;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                //线程中断抛异常                throw new InterruptedException();        }    } finally {           if (failed)            cancelAcquire(node);    }}

关键信息请看注释,现在看这段代码就很轻松了吧:),与acquire方法逻辑几乎一致,唯一的区别是当parkAndCheckInterrupt返回true时即线程阻塞时该线程被中断,代码抛出被中断异常。而不想中断异常的acquire方法,主动调用

static void selfInterrupt() {       Thread.currentThread().interrupt();}

2.4.4 超时等待式获取锁(tryAcquireNanos()方法)

通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:

  1. 在超时时间内,当前线程成功获取了锁;
  2. 当前线程在超时时间内被中断;
  3. 超时时间结束,仍未获得锁返回false。

我们仍然通过采取阅读源码的方式来学习底层具体是怎么实现的,该方法会调用AQS的方法tryAcquireNanos(),源码为:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {       if (Thread.interrupted())        throw new InterruptedException();    return tryAcquire(arg) ||        //实现超时等待的效果        doAcquireNanos(arg, nanosTimeout);}

很显然这段源码最终是靠doAcquireNanos方法实现超时等待的效果,该方法源码如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)        throws InterruptedException {       if (nanosTimeout <= 0L)        return false;    //1. 根据超时时间和当前时间计算出截止时间    final long deadline = System.nanoTime() + nanosTimeout;    final Node node = addWaiter(Node.EXCLUSIVE);    boolean failed = true;    try {           for (;;) {               final Node p = node.predecessor();            //2. 当前线程获得锁出队列            if (p == head && tryAcquire(arg)) {                   setHead(node);                p.next = null; // help GC                failed = false;                return true;            }            // 3.1 重新计算超时时间            nanosTimeout = deadline - System.nanoTime();            // 3.2 已经超时返回false            if (nanosTimeout <= 0L)                return false;            // 3.3 线程阻塞等待             if (shouldParkAfterFailedAcquire(p, node) &&                nanosTimeout > spinForTimeoutThreshold)                LockSupport.parkNanos(this, nanosTimeout);            // 3.4 线程被中断抛出被中断异常            if (Thread.interrupted())                throw new InterruptedException();        }    } finally {           if (failed)            cancelAcquire(node);    }}

程序逻辑如图所示:

img

超时等待式获取锁(doAcquireNanos()方法)

程序逻辑同独占锁可响应中断式获取基本一致,唯一的不同在于获取锁失败后,对超时时间的处理上,在第1步会先计算出按照现在时间和超时时间计算出理论上的截止时间,比如当前时间是8h10min,超时时间是10min,那么根据deadline = System.nanoTime() + nanosTimeout计算出刚好达到超时时间时的系统时间就是8h 10min+10min = 8h 20min。然后根据deadline - System.nanoTime()就可以判断是否已经超时了,比如,当前系统时间是8h 30min很明显已经超过了理论上的系统时间8h 20min,deadline - System.nanoTime()计算出来就是一个负数,自然而然会在3.2步中的If判断之间返回false。如果还没有超时即3.2步中的if判断为true时就会继续执行3.3步通过LockSupport.parkNanos使得当前线程阻塞,同时在3.4步增加了对中断的检测,若检测出被中断直接抛出被中断异常。

2.4.5 共享锁的获取(acquireShared()方法)

ReentrantReadWriteLock 里的sync

在聊完AQS对独占锁的实现后,我们继续一鼓作气的来看看共享锁是怎样实现的?共享锁的获取方法为acquireShared,源码为:

public final void acquireShared(int arg) {       if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}

这段源码的逻辑很容易理解,在该方法中会首先调用tryAcquireShared方法,tryAcquireShared返回值是一个int类型,当返回值为大于等于0的时候方法结束说明获得成功获取锁,否则,表明获取同步状态失败即所引用的线程获取锁失败,会执行doAcquireShared方法,该方法的源码为:

private void doAcquireShared(int arg) {       final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {           boolean interrupted = false;        for (;;) {               final Node p = node.predecessor();            if (p == head) {                   int r = tryAcquireShared(arg);                if (r >= 0) {                       // 当该节点的前驱节点是头结点且成功获取同步状态                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {           if (failed)            cancelAcquire(node);    }}

现在来看这段代码会不会很容易了?逻辑几乎和独占式锁的获取一模一样,这里的自旋过程中能够退出的条件是当前节点的前驱节点是头结点并且tryAcquireShared(arg)返回值大于等于0即能成功获得同步状态

2.4.6 共享锁的释放(releaseShared()方法)

共享锁的释放在AQS中会调用方法releaseShared:

public final boolean releaseShared(int arg) {       if (tryReleaseShared(arg)) {           doReleaseShared();        return true;    }    return false;}

当成功释放同步状态之后即tryReleaseShared会继续执行doReleaseShared方法:

private void doReleaseShared() {       /*     * Ensure that a release propagates, even if there are other     * in-progress acquires/releases.  This proceeds in the usual     * way of trying to unparkSuccessor of head if it needs     * signal. But if it does not, status is set to PROPAGATE to     * ensure that upon release, propagation continues.     * Additionally, we must loop in case a new node is added     * while we are doing this. Also, unlike other uses of     * unparkSuccessor, we need to know if CAS to reset status     * fails, if so rechecking.     */    for (;;) {           Node h = head;        if (h != null && h != tail) {               int ws = h.waitStatus;            if (ws == Node.SIGNAL) {                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                unparkSuccessor(h);            }            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;    }}

这段方法跟独占式锁释放过程有点点不同,在共享式锁的释放过程中,对于能够支持多个线程同时访问的并发组件,必须保证多个线程能够安全的释放同步状态,这里采用的CAS保证,当CAS操作失败continue,在下一次循环中进行重试。

2.4.7 可中断(acquireSharedInterruptibly()方法),超时等待(tryAcquireSharedNanos()方法)

关于可中断锁以及超时等待的特性其实现和独占式锁可中断获取锁以及超时等待的实现几乎一致,具体的就不再说了

2.5 Condition队列

参考 https://blog.csdn.net/qq_38293564/article/details/80554516

2.5.1 Object的监视器方法与Condition接口对比如下:

对比项 Object监视器方法 Condition
前置条件 获取对象的监视器锁 调用Lock.lock()获取锁调用Lock.newCondition()获取Condition对象
调用方法 直接调用如:object.wait() 直接调用如:condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待队列 支持 支持
当前线程释放锁并进入等待队列,在等待状态中不响应中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

2.5.2 内部结构

Condition作为AQS的内部类,复用了AQS的内部的Node,维护一个队列(单向),成员只有头结点和尾节点。

    public class ConditionObject implements Condition, java.io.Serializable {           private static final long serialVersionUID = 1173984872572414699L;        /** First node of condition queue. */        private transient Node firstWaiter;        /** Last node of condition queue. */        private transient Node lastWaiter;

每一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法之后,将会以当前线程构造节点,并将节点从尾部加入等待队列
在这里插入图片描述
并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示

2.5.3 源码分析

2.5.3.1 await()方法

调用await()方法的线程是成功获取了锁的线程,也就是同步队列中的首节点(被前一个节点唤醒),或者根本没有入CLH队列。
该方法会将当前线程构造节点并加入condition队列中,然后释放同步状态(锁),唤醒CLH队列的的后继节点,然后当前线程会进入等待状态。

public final void await() throws InterruptedException {       // 0 检测线程中断状态    if (Thread.interrupted())        throw new InterruptedException();    //1. 将当前线程包装为Node节点加入等待condition队列    Node node = addConditionWaiter();    //2. 释放同步状态,也就是释放锁,并唤醒CLH队列的下一个节点。记录当前节点的状态。    int savedState = fullyRelease(node);    int interruptMode = 0;    //3. 检测该节点是否在CLH队列中,    //3.1 调用await肯定不在CLH队列中,为什么还需要自旋?    //如果不在,则说明该线程,没有进入CLH队列,可能其他逻辑意外唤醒当前线程,则继续挂起?    while (!isOnSyncQueue(node)) {           // 3. 1挂起线程        LockSupport.park(this);        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)            break;    }    //4 被其他condition sign后, 重新竞争锁。把savedState带上。    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)        interruptMode = REINTERRUPT;    //5 清理条件队列中的不是在等待条件的节点    if (node.nextWaiter != null) // clean up if cancelled        unlinkCancelledWaiters();    if (interruptMode != 0)        reportInterruptAfterWait(interruptMode);}//加入等待队列是通过addConditionWaiter()方法来完成的:private Node addConditionWaiter() {       // 尾节点    Node t = lastWaiter;    // 尾节点如果不是CONDITION状态,则表示该节点不处于等待状态,需要清理节点    if (t != null && t.waitStatus != Node.CONDITION) {           unlinkCancelledWaiters();        t = lastWaiter;    }    // 根据当前线程创建Node节点    Node node = new Node(Thread.currentThread(), Node.CONDITION);    // 将该节点加入等待队列的末尾    if (t == null)        firstWaiter = node;    else        t.nextWaiter = node;    lastWaiter = node;    return node;}

在这里插入图片描述

将当前线程加入到等待队列之后,需要释放同步状态,该操作通过fullyRelease(Node)方法来完成:

final int fullyRelease(Node node) {       boolean failed = true;    try {           // 获取同步状态        int savedState = getState();        // 释放锁        if (release(savedState)) {               failed = false;            return savedState;        } else {               throw new IllegalMonitorStateException();        }    } finally {           if (failed)            node.waitStatus = Node.CANCELLED;    }}

isOnSyncQueue(Node)方法自旋检查,是否在CLH队列中,不在挂起当前线程。

final boolean isOnSyncQueue(Node node) {       //1 节点状态为CONDITION,或者前驱节点为null,返回false    if (node.waitStatus == Node.CONDITION || node.prev == null)        return false;    //2 后继节点不为null,那么肯定在同步队列中    if (node.next != null) // If has successor, it must be on queue        return true;    //3 从CLH队列尾查找,当前节点在CLH队列否    return findNodeFromTail(node);}

若线程在同步队列中,且获取了同步状态,可能会调用unlinkCancelledWaiters()方法来清理等待队列中不为CONDITION 状态的节点:

private void unlinkCancelledWaiters() {       Node t = firstWaiter;    Node trail = null;    while (t != null) {           Node next = t.nextWaiter;        if (t.waitStatus != Node.CONDITION) {               t.nextWaiter = null;            if (trail == null)                firstWaiter = next;            else                trail.nextWaiter = next;            if (next == null)                lastWaiter = trail;        }        else            trail = t;        t = next;    }}

2.5.3.2 signal方法

将会唤醒在condition队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到CLH队列中。

public final void signal() {       // 判断是否是当前线程获取了锁    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    //1 唤醒等待队列的首节点    Node first = firstWaiter;    if (first != null)        doSignal(first);}

该方法最终调用doSignal(Node)方法来唤醒节点:

private void doSignal(Node first) {       do {           //2自旋, 把等待队列的首节点移除之后,要修改首结点        if ( (firstWaiter = first.nextWaiter) == null)            lastWaiter = null;        first.nextWaiter = null;    } while (!transferForSignal(first) &&                (first = firstWaiter) != null);}

将节点移动到同步队列是通过transferForSignal(Node)方法完成的:

final boolean transferForSignal(Node node) {       //3 尝试将该节点的状态从CONDITION修改为0    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))        return false;        //4 将节点加入到CLH队列尾部,返回该节点的前驱节点    Node p = enq(node);    int ws = p.waitStatus;    //5 如果前驱节点的状态为CANCEL或者修改waitStatus失败,则直接唤醒当前线程    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))        LockSupport.unpark(node.thread);    return true;}

在这里插入图片描述
被唤醒后的线程,将从await()方法中的while循环中退出(因为此时isOnSyncQueue(Node)方法返回true),进而调用acquireQueued()方法加入到获取同步状态的竞争中。

成功获取了锁之后,被唤醒的线程将从先前调用的await()方法返回,此时,该线程再一次获取了锁。

2.5.3.3 signalAll

Condition的signalAll()方法,相当于对等待队列的每个节点均执行一次signal()方法,效果就是将等待队列中的所有节点移动到同步队列中。将所有等待队列中的node放入同步队列,并逐一唤醒。

        public final void signalAll() {               if (!isHeldExclusively())                throw new IllegalMonitorStateException();            Node first = firstWaiter;            if (first != null)                doSignalAll(first);        }        private void doSignalAll(Node first) {               lastWaiter = firstWaiter = null;            do {                   Node next = first.nextWaiter;                first.nextWaiter = null;                transferForSignal(first);                first = next;            } while (first != null);        }

参考

https://www.jianshu.com/p/cc308d82cc71

https://www.jianshu.com/p/279baac48960
https://segmentfault.com/a/1190000015562787
https://segmentfault.com/a/1190000015804888
方腾飞:《Java并发编程的艺术》

上一篇:ReentrantLock
下一篇:java线程安全的集合

发表评论

最新留言

能坚持,总会有不一样的收获!
[***.219.124.196]2025年04月09日 20时43分29秒