
本文共 24668 字,大约阅读时间需要 82 分钟。
【JDK源码分析系列】AbstractQueuedSynchronizer 源码分析 -- 锁的获取与释放
【1】AbstractQueuedSynchronizer 整体架构
1. AQS 中队列只有两个 : 同步队列 + 条件队列,底层数据结构两者都是链表;
2. 图中有四种颜色的线代表四种不同的场景,1、2、3 序号代表看的顺序【2】获取锁
Lock.lock () 方法来获得锁,最终目的是想让线程获得对资源的访问权,Lock 一般是 AQS 的子类,lock 方法根据情况一般会选择调用 AQS 的 acquire 或 tryAcquire 方法,acquire 方法 AQS 已经实现了,tryAcquire 方法是等待子类去实现,acquire 方法制定了获取锁的框架,先尝试使用 tryAcquire 方法获取锁,获取不到时,再入同步队列中等待锁,tryAcquire 方法 AQS 中直接抛出一个异常,表明需要子类去实现,子类可以根据同步器的 state 状态来决定是否能够获得锁。
【2.1】同步状态获取流程示意图
大致步骤总结 :
1. 使用 tryAcquire 方法尝试获得锁,获得锁直接返回,获取不到锁的走 2;
2. 把当前线程组装成节点(Node),追加到同步队列的尾部(addWaiter); 3. 自旋,使同步队列中当前节点的前置节点状态为 signal 后,然后阻塞自己;【2.2】超时获取同步状态流程示意图
【2.3】获取锁的相关方法源码分析
【2.3.1】AbstractQueuedSynchronizer -- public final void acquire(int arg)
/** * 独占式获取同步状态 * 若当前线程获取同步状态成功,则由该方法返回; * 否则,进入同步队列等待 * * 注意在调用该方法时,子类中需要重写tryAcquire(int arg)方法; */// 排它模式下,尝试获得锁public final void acquire(int arg) { /** * tryAcquire(arg) : 该方法为子类可重写的方法 * if判断中前一个条件不成立,即当前线程没有成功获取同步状态,则执行 acquireQueued 方法 * acquireQueued 方法 : 该方法用于已经排队的线程在独占非中断模式下尝试获取同步状态 */ // tryAcquire 方法是需要实现类去实现的, // 实现思路一般都是 cas 给 state 赋值来决定是否能获得锁 if (!tryAcquire(arg) && // addWaiter 入参代表是排他模式 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //中断当前线程 selfInterrupt();}
流程说明(流程见整体架构图中红色场景) :
1. 尝试执行一次 tryAcquire,如果成功直接返回,失败走 2;
2. 线程尝试进入同步队列,首先调用 addWaiter 方法,把当前线程放到同步队列的队尾; 3. 接着调用 acquireQueued 方法,两个作用,1:阻塞当前节点,2:节点被唤醒时,使其能够获得锁; 4. 如果 2、3 失败了,中断线程;【2.3.2】AbstractQueuedSynchronizer -- final boolean acquireQueued(final Node node, int arg)
/** * 该方法用于已经排队的线程尝试获取同步状态; * 队列中的节点做自旋操作; */// 主要做两件事情:// 1:通过不断的自旋尝试使自己前一个节点的状态变成 signal,然后阻塞自己// 2:获得锁的线程执行完成之后,释放锁时,会把阻塞的 node 唤醒, node 唤醒之后再次自旋,尝试获得锁// 返回 false 表示获得锁成功,返回 true 表示失败final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 死循环,不断地尝试在当前节点获取同步状态 // 即队列中的节点进行自旋操作 for (;;) { // 选上一个节点 //p 为当前节点的前驱节点 final Node p = node.predecessor(); //若 p 为当前队列的首节点,即当前节点为队列首节点的后继节点 //并且当前节点尝试获取同步状态成功 // // 有两种情况会走到 p == head: // 1:node 之前没有获得锁,进入 acquireQueued 方法时,才发现他的前置节点就是头节点,于是尝试获得一次锁; // 2:node 之前一直在阻塞沉睡,然后被唤醒,此时唤醒 node 的节点正是其前一个节点,也能走到 if // 如果自己 tryAcquire 成功,就立马把自己设置成 head,把上一个节点移除 // 如果 tryAcquire 失败,尝试进入同步队列 if (p == head && tryAcquire(arg)) { // 获得锁,设置成 head 节点 //将 node 设置为当前队列的首节点 setHead(node); //断开 p 节点与后继节点的联系,即实现节点 p 出队处理 //p被回收 p.next = null; // help GC failed = false; return interrupted; } /** * 当前节点获取同步状态失败后的处理 * shouldParkAfterFailedAcquire : 用于判断当前节点获取同步状态失败后 * 是否需要阻塞; * parkAndCheckInterrupt : 阻塞当前线程并检查当前线程是否被中断; * * 若当前节点是 SIGNAL 状态,则当前节点需要被阻塞; * 则阻塞当前节点,继续遍历下一个节点 */ // shouldParkAfterFailedAcquire 把 node 的前一个节点状态置为 SIGNAL // 只要前一个节点状态是 SIGNAL了,那么自己就可以阻塞(park)了 // parkAndCheckInterrupt 阻塞当前线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } /** * 无论当前节点是否获取同步状态,该端代码肯定会执行 * cancelAcquire : 用于中断不间断的获取同步状态的尝试 */ } finally { /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 */ // 如果获得node的锁失败,将 node 从队列中移除 if (failed) cancelAcquire(node); }}
【2.3.3】AbstractQueuedSynchronizer -- private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
/** * 该方法用于判断当前节点获取同步状态失败后是否需要阻塞; */// shouldParkAfterFailedAcquire 这个方法的主要目的就是把前一个节点的状态置为 SIGNAL,// 只要前一个节点的状态是 SIGNAL,当前节点就可以阻塞了//(parkAndCheckInterrupt 就是使节点阻塞的方法)//// 当前线程可以安心等待的标准,就是前一个节点线程状态是SIGNAL了// 入参 pred 是当前节点 node 的前一个节点// 关键操作:// 1:确认前置节点是否有效,无效的话,一直往前找到状态不是取消的节点// 2: 把前置节点状态置为 SIGNAL// 1、2 两步操作,有可能一次就成功,有可能需要外部循环多次才能成功,但最后一定是可以成功的private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取前向节点的状态 int ws = pred.waitStatus; // 如果前一个节点 waitStatus 状态已经是SIGNAL了,直接返回,不需要在自旋了 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 如果当前节点状态已经被取消了 // static final int CANCELLED = 1; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ /** * 从当前节点 node 开始前项遍历队列 * 直到节点的等待状态不是CANCELLED */ // 找到前一个状态不是取消的节点,把当前 node 挂在有效节点身上 // 因为节点状态是取消的话,是无效的,是不能作为 node 的前置节点的, // 所以必须找到 node 的有效节点才行 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); /** * 将遍历过程中处于CANCELLED状态的节点从队列中删除 */ pred.next = node; // 否则直接把节点状态置为 SIGNAL } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // SIGNAL 状态的意义:同步队列中的节点在自旋获取锁的时候, // 如果前一个节点的状态是 SIGNAL,那么自己就可以阻塞休息了,否则自己一直自旋尝试获得锁 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
【2.3.4】AbstractQueuedSynchronizer -- private void cancelAcquire(Node node)
/** * cancelAcquire : 删除一个不断的获取尝试 * 将 node 节点从队列中删除; */private void cancelAcquire(Node node) { //若节点不存在则直接返回 if (node == null) return; node.thread = null; /** * 遍历队列跳过其中处于 CANCELLED 状态的节点 * 功能确定 node 节点的有效地前项节点 */ Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; /** * 记录 pred 节点的下一个节点状态,方便 CAS 处理 */ Node predNext = pred.next; // 更新 node 状态为 CANCELLED node.waitStatus = Node.CANCELLED; /** * 场景一: * 若当前节点是队列的尾节点 * 则将确定的 pred 设置为 tail */ if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { /** * 场景二 : * 当前节点既不是Head节点的后继节点也不是Tail节点 * * 前驱的状态需要设置为 SIGNAL */ int ws; if (pred != head && //确保 node 节点的前驱状态为 SIGNAL ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { /** * 场景三 : * 当前节点是 head 节点的后继节点 * 将 node 的后继节点激活,从而实现将 node 节点出队的目的 */ unparkSuccessor(node); } node.next = node; // help GC }}
【2.4】AbstractQueuedSynchronizer -- public final void acquireInterruptibly(int arg)
/** * 独占式获取同步状态,同时该方法会响应中断; * 当前线程未获取到同步状态则进入同步队列中; * 若当前线程被中断,该方法抛出 InterruptedException 并返回; */ public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } /** * 获取同步状态同时该方法会响应中断; */ private void doAcquireInterruptibly(int arg) throws InterruptedException { //添加类型为独占式的节点 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { //自旋 for (;;) { /** * 获取node节点的前驱 * 若节点是头节点的后继节点则尝试获取同步状态 */ final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { // 获得锁,设置成 head 节点 //将 node 设置为当前队列的首节点 setHead(node); //断开 p 节点与后继节点的联系,即实现节点 p 出队处理 //p被回收 p.next = null; // help GC failed = false; return; } /** * 获取同步状态失败后的处理逻辑 * 判断同步状态失败后是否需要阻塞 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //若线程需要阻塞且需要中断则抛出异常 throw new InterruptedException(); } } finally { /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 */ // 如果获得node的锁失败,将 node 从队列中移除 if (failed) cancelAcquire(node); } }
【2.5】AbstractQueuedSynchronizer -- public final boolean tryAcquireNanos(int arg, long nanosTimeout)
/** * 在 acquireInterruptibly 方法的基础上添加了超时功能 * * 若当前线程在超时时间内没有获取同步状态,返回false; * 若获取了同步状态则返回true; */ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); } /** * 获取同步状态同时该方法会响应中断并具有超时相应功能; */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { //超时直接返回 false 表示获取锁失败 if (nanosTimeout <= 0L) return false; // 初始化截止时间点 final long deadline = System.nanoTime() + nanosTimeout; // 队列中添加独占类型的节点 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 队列中的节点进行自旋操作 for (;;) { //获取 node 的前项节点 final Node p = node.predecessor(); //p 是 head 则再次尝试获取锁 if (p == head && tryAcquire(arg)) { // 获得锁,设置成 head 节点 //将 node 设置为当前队列的首节点 setHead(node); //断开 p 节点与后继节点的联系,即实现节点 p 出队处理 //p被回收 p.next = null; // help GC failed = false; return true; } // 重新计算超时值 nanosTimeout = deadline - System.nanoTime(); //判断是否超时 if (nanosTimeout <= 0L) return false; /** * 判断获取同步状态失败后是否需要阻塞当前线程 * spinForTimeoutThreshold : 剩余时间; * nanosTimeout > spinForTimeoutThreshold : 超时时间未到 */ if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 阻塞当前线程,最长不超过 nanosTimeout 纳秒 LockSupport.parkNanos(this, nanosTimeout); /** * 判断当前线程是否中断 * 若当前线程中断则抛出异常 */ if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 */ cancelAcquire(node); } }
【3】获取共享锁
【3.1】获取共享锁的相关方法源码分析
【3.1.1】AbstractQueuedSynchronizer -- public final void acquireShared(int arg)
/** * 共享式的获取同步状态 * 若当前线程未获取同步状态,将会进入同步队列等待 * 同一时刻可以有多个线程获取同步状态 */ // 共享模式下,尝试获得锁 // tryAcquireShared 首先尝试获得锁,返回值小于 0 表示没有获得锁 // 共享锁和排他锁最大的不同在于:对于同一个共享资源 // 排他锁只能让一个线程获得 // 共享锁可以让多个线程获得,arg 可以被子类当做任意参数,比如当做可获得锁线程的最大个数 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } /** * 尝试在共享模式下获取同步状态 */ private void doAcquireShared(int arg) { // 创建 SHARED 类型的节点并加入到同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 死循环,节点 node 在队列中自旋 for (;;) { // 获取节点 node 的前项节点 final Node p = node.predecessor(); // 当节点 node 是队列头节点的后继时,开始尝试获取同步状态 if (p == head) { /** * tryAcquireShared(arg) : 该方法其他类有待重写以实现特定的功能; */ int r = tryAcquireShared(arg); // r > 0, 表示当前线程获取同步状态的同时,存在其他线程也获取了同步状态 if (r >= 0) { // 将 node 设置为头节点的同时,唤醒获取同步状态的其他线程 // 此处和排它锁也不同,排它锁使用的是 setHead,这里的setHeadAndPropagate 方法 // 不仅仅是把当前节点设置成 head,还会唤醒头节点的后续节点 setHeadAndPropagate(node, r); p.next = null; // help GC // 若线程需要中断则中断线程 if (interrupted) selfInterrupt(); failed = false; return; } } /** * 判断尝试获取同步状态失败后,是否需要阻塞节点 * 若需要阻塞节点,则将节点阻塞并中断线程 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 */ if (failed) /** * cancelAcquire : 删除一个不断的获取尝试 * 将 node 节点从队列中删除; */ cancelAcquire(node); } }
【3.1.2】AbstractQueuedSynchronizer -- private void setHeadAndPropagate(Node node, int propagate)
/** * 设置头节点状态 * 两个入参 * 1. 当前成功获取共享锁的节点 * 2. tryAcquireShared 方法的返回值 * tryAcquireShared 返回值说明 : * <0 : 失败 * =0 : 成功,后续的节点获取同步状态全部失败 * >0 : 成功,后续的节点获取同步状态成功 */// 主要做两件事情// 1:把当前节点设置成头节点// 2:看看后续节点有无正在等待,并且也是共享模式的,有的话唤醒这些节点private void setHeadAndPropagate(Node node, int propagate) { // 记录原队列的首节点 Node h = head; // Record old head for check below /** * 设置头节点,此处将获取到锁的节点设置为头节点 * 注:这里是获取到锁之后的操作,不需要并发控制 */ // 当前节点设置成头节点 setHead(node); /** * propagate > 0 : 传播被调用者指定,表明共享状态下存在后继节点需要被唤醒; * propagate > 0 表示已经有节点获得共享锁了 * * h == null || h.waitStatus < 0 : 旧的头节点相关判断, * 旧的头节点不存在或者旧的头节点的等待状态 < 0; * (h = head) == null || h.waitStatus < 0) : 新的头节点相关判断, * 新的头节点不存在或者新的头节点的等待状态 < 0; * 即头节点的后继节点需要被唤醒 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { /** * 获取新的头节点的后继节点, * 当 s == null || s.isShared() 时, * 即当前节点的后继节点是共享类型或者当前没有后继节点, * 进行唤醒操作 */ Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); }}
【3.1.3】AbstractQueuedSynchronizer -- public final void acquireSharedInterruptibly(int arg)
/** * 共享式的获取同步状态 * 若当前线程未获取同步状态,将会进入同步队列等待 * 同一时刻可以有多个线程获取同步状态 * 同时该方法响应中断,若当前线程被中断则该方法会抛出 InterruptedException 异常并返回 */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * 共享式获取同步状态并会响应中断 */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 创建 SHARED 类型的节点并加入到同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // node 节点进行自旋处理 for (;;) { // 获取节点 node 的前项节点 final Node p = node.predecessor(); // 当 node 节点是队列首节点的后继,则尝试获取同步状态 if (p == head) { // tryAcquireShared(arg) : 该方法其他类有待重写以实现特定的功能; int r = tryAcquireShared(arg); // r > 0, 表示当前线程获取同步状态的同时,存在其他线程也获取了同步状态 if (r >= 0) { // 设置 node 为头节点的同时,唤醒队列中所有获取同步状态的线程 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } /** * 判断获取同步状态失败时,节点是否需要阻塞 * 节点需要阻塞则阻塞并中断线程,同时抛出异常 */ if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 * cancelAcquire : 删除一个不断的获取尝试 * 将 node 节点从队列中删除 */ if (failed) cancelAcquire(node); } }
【3.1.4】AbstractQueuedSynchronizer -- public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
/** * 共享式的获取同步状态 * 若当前线程未获取同步状态,将会进入同步队列等待 * 同一时刻可以有多个线程获取同步状态 * 同时该方法响应中断,若当前线程被中断则该方法会抛出 InterruptedException 异常并返回 * 同时该方法拥有超时限制,若当前线程在超时时间内没有获取同步状态,返回false; * 若获取了同步状态则返回true; */ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } /** * 共享式获取同步状态 * 响应中断 * 超时限制 */ private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 初始化截止时间点 final long deadline = System.nanoTime() + nanosTimeout; // 队列中添加共享类型的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { // 队列中的节点进行自旋操作 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } // 重新计算超时值 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; /** * 判断获取同步状态失败后是否需要阻塞当前线程 * spinForTimeoutThreshold : 剩余时间; * nanosTimeout > spinForTimeoutThreshold : 超时时间未到 */ if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) // 阻塞当前线程,最长不超过 nanosTimeout 纳秒 LockSupport.parkNanos(this, nanosTimeout); /** * 判断当前线程是否中断 * 若当前线程中断则抛出异常 */ if (Thread.interrupted()) throw new InterruptedException(); } /** * 如果出现异常或者出现中断,就会执行finally的取消线程的请求操作 */ } finally { if (failed) cancelAcquire(node); } }
【4】释放锁
释放锁的触发时机就是 Lock.unLock () 方法,目的就是让线程释放对资源的访问权(流程见整体架构图蓝色路线)
【4.1】AbstractQueuedSynchronizer -- public final boolean release(int arg)
/** * 独占式的释放同步状态 * * 该方法在释放同步状态之后,将同步队列中首节点的后继节点包含的线程唤醒 */// unlock 的基础方法public final boolean release(int arg) { /** * tryRelease(arg) : 该方法是有待重写的方法; */ // tryRelease 交给实现类去实现,一般就是用当前同步器状态减去 arg, // 如果返回 true 说明成功释放锁 if (tryRelease(arg)) { Node h = head; // 头节点不为空,并且非初始化状态 if (h != null && h.waitStatus != 0) // 当队列存在首节点,释放队列首节点的后继节点 // 从头开始唤醒等待锁的节点 unparkSuccessor(h); return true; } return false;}
【4.1.1】AbstractQueuedSynchronizer -- private void unparkSuccessor(Node node)
/** * 激活当前节点的后继节点 */// 当线程释放锁成功后,从 node 开始唤醒同步队列中的节点// 通过唤醒机制,保证线程不会一直在同步队列中阻塞等待private void unparkSuccessor(Node node) { /** * 获取当前节点的等待状态 */ // node 节点是当前释放锁的节点,也是同步队列的头节点 int ws = node.waitStatus; // 如果节点已经被取消了,把节点的状态置为初始化 if (ws < 0) /** * 设置当前节点的等待状态为初始状态;(CAS方式) * 表明当前节点已经处理完毕; * 处理完毕的节点将会出队; */ compareAndSetWaitStatus(node, ws, 0); // 拿出 node 节点的后面一个节点 Node s = node.next; /** * s指向当前节点的后继节点 * s == null || s.waitStatus > 0 : 表明当前节点没有后继节点或当前节点的后继节点 * 处于 CANCELLD 状态 */ // 遇到以上这两种情况,就从队尾开始,向前遍历,找到第一个 waitStatus 字段不是被取消的 if (s == null || s.waitStatus > 0) { s = null; //在当前节点的后继节点为无效的情况下,从队列尾部前向遍历寻找非空 //且不处于 CANCELLD 状态的节点 // //主要是因为节点被阻塞的时候是在 acquireQueued 方法里面被阻塞的, //唤醒时也一定会在 acquireQueued 方法里面被唤醒,唤醒的条件是, //判断当前节点的前置节点是否是头节点, //所以这里必须使用从尾到头的迭代顺序才行,目的就是为了过滤掉无效的前置节点, //不然节点被唤醒时,发现其前置节点还是无效节点,就又会陷入阻塞 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //当找到队列中可以激活的节点,激活节点中的线程 if (s != null) LockSupport.unpark(s.thread);}
【4.2】AbstractQueuedSynchronizer -- public final boolean releaseShared(int arg)
// 共享模式下的释放public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 这个方法就是线程在获得锁时,唤醒后续节点时调用的方法 doReleaseShared(); return true; } return false;}
【4.2.1】AbstractQueuedSynchronizer -- private void doReleaseShared()
/** * 该方法执行共享式释放同步状态的操作 */// 释放后置共享节点private void doReleaseShared() { /** * 该方法即使在存在其他正在处理的获取与释放方法正在进行的条件下, * 也需要确保一个释放的传播; * 通常的处理方式是 : * 1. 头结点的等待状态为 SIGNAL 时,释放头节点的后继节点; * 2. 头节点的等待状态非 SIGNAL 时,将等待状态设置为 PROPAGATE, * 以确保传播能够继续进行; * 3. 进行循环遍历原因 : * 1). 防止在释放同步状态的过程中有新的节点加入队列; * 2). 确保 CAS 方式成功设置等待状态; */ //开启对队列的循环遍历 for (;;) { // 节点 h 一直指向当前队列的头节点 Node h = head; if (h != null && h != tail) { /** * 获取节点的等待状态 * * Node.SIGNAL,当前节点处于SIGNAL状态时,在释放时需要通知后继节点 */ int ws = h.waitStatus; // 如果队列状态是 SIGNAL ,说明后续节点都需要唤醒 if (ws == Node.SIGNAL) { /** * Node.SIGNAL,当前节点处于SIGNAL状态时,在释放时需要通知后继节点 * 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个, * 避免两次unpark */ // CAS 保证只有一个节点可以运行唤醒的操作 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 激活头节点的后继节点中的线程 unparkSuccessor(h); } /** * 如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE * 确保以后可以传递下去 */ else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } /** * 如果头结点没有发生变化,表示设置完成,退出循环 * 如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递, * 必须进行重试 */ // 第一种情况,头节点没有发生移动,结束 // 第二种情况,因为此方法可以被两处调用,一次是获得锁的地方,一处是释放锁的地方, // 加上共享锁的特性就是可以多个线程获得锁,也可以释放锁,这就导致头节点可能会发生变化, // 如果头节点发生了变化,就继续循环,一直循环到头节点不变化时,结束循环 if (h == head) // loop if head changed break; }}
【5】AbstractQueuedSynchronizer 共通方法
【5.1】private Node addWaiter(Node mode) 与 private Node enq(final Node node) 方法
/** * 向队列中添加等待节点 */// 方法主要目的:node 追加到同步队列的队尾// 入参 mode 表示 Node 的模式(排它模式还是共享模式)// 出参是新增的 node// 主要思路:// 新 node.pre = 队尾// 队尾.next = 新 nodeprivate Node addWaiter(Node mode) { /** * 新建节点,并为该节点关联在等待队列中的节点 * * 入参 mode 用于表示节点的类型 * 类型 : * 独占式、共享式 */ // 初始化 Node Node node = new Node(Thread.currentThread(), mode); // 这里的逻辑和 enq 一致,enq 的逻辑仅仅多了队尾是空,初始化的逻辑 // 这个思路在 java 源码中很常见,先简单的尝试放一下,成功立马返回,如果不行,再 while 循环 // 很多时候,这种算法可以帮忙解决大部分的问题,大部分的入队可能一次都能成功,无需自旋 Node pred = tail; if (pred != null) { node.prev = pred; //在当前队列的尾部插入新增节点;(CAS方式) if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } /** * 若同步队列为空,则创建同步队列 */ //自旋保证node加入到队尾 enq(node); return node;}/** * 该方法向队列添加节点 */// 线程加入同步队列中方法,追加到队尾// 这里需要重点注意的是,返回值是添加 node 的前一个节点private Node enq(final Node node) { for (;;) { //t 指向队列的尾节点 Node t = tail; //t 为null 表明当前队列为空,则初始化队列; // 如果队尾为空,说明当前同步队列都没有初始化,进行初始化 if (t == null) { // Must initialize //设置队列的头结点,初始化后头、尾指针都指向该节点;(CAS方式) if (compareAndSetHead(new Node())) tail = head; // 队尾不为空,将当前节点追加到队尾 } else { //在当前队列的尾部插入新增节点;(CAS方式) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}
参考致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。 【1】Java并发编程的艺术【2】
【3】
【4】
【5】
【6】
【7】
【8】
发表评论
最新留言
关于作者
