JAVA多线程第三部分(二) AQS
发布日期:2021-05-06 20:02:32 浏览次数:14 分类:精选文章

本文共 9680 字,大约阅读时间需要 32 分钟。

并发笔记传送门:

ReentrantLockSemaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lockacquire时成功返回),也可以等待(在调用lockacquire时阻塞),还可以取消(在调用tryLocktryAcquire时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLockSemaphore是基于AQS构建的,还包括CountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask

AQS 中的获取与释放

在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的锁获取和释放操作。并且获取操作是一种依赖状态的操作,并且通常会阻塞。

如下伪代码给出了 AQS 获取与释放的简单逻辑。 (Douge Lea 老爷子源码写的太精妙,得慢慢品)

boolean acquire() throws InterruptedException{           while (当前状态不允许获取操作){               if (需要阻塞获取请求){                   如果当前线程不再队列中,则将其插入队列                阻塞当前线程            }else {                   返回失败            }        }        更新同步器的状态        如果线程位于队列中,则将其移除队列        返回成功    }    void release(){           更新同步器的状态        if(新的状态允许某个阻塞的线程获取成功){               解除队列中一个或多个线程的阻塞状态        }    }

一个获取操作包括两部分:

  • 首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是根据同步器的语义决定的。例如:对于锁来说,如果它没有被某个线程持有,那么就能成功的获取;而对于闭锁来说,如果它处于结束状态,那么也能被成功的获取。
  • 其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否获取该同步器照成影响。例如,当获取一个锁后,锁的状态将『未备持有』变成『已被持有』,而从Semaphore中获取许可后,将把许可证的数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它。

根据同步器性质的不同,实现的方法各有差异:

  • 独占操作(例如:ReentrantLock):如果某个同步器支持独占的获取操作,那么需要实现 AQStryAcquiretryReleasetryHeldExeclusively等方法。
  • 非独占操作(例如:Semphore,CountDownLatch):对于支持共享获取的同步器,则应该实现tryAcquireSharedtryReleaseShared等方法

AQS 中的的acquireacquireSharedreleasereleaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能够执行。

一个简单的闭锁

OneShotLatch包含两个公有方法:awaitsignal,分别对应获取和释放操作。起初,闭锁是关闭的,任何调用 await 的线程都将阻塞并直到闭锁打开。当通过调用 signal 打开闭锁时,所有等待中的线程豆浆被释放,并且随后到达闭锁的线程也允许被执行。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;public class OneShotLatch {       private final Sync sync = new Sync();    public void signal() {           sync.releaseShared(0);    }    public void await() throws InterruptedException {           sync.acquireSharedInterruptibly(0);    }    private static class Sync extends AbstractQueuedSynchronizer {           @Override        protected int tryAcquireShared(int arg) {               int state = getState();            //如果闭锁是开的(state==1),那么这个操作讲成功,否则失败            System.out.println("state = " + state);            return getState() == 1 ? 1 : -1;        }        @Override        protected boolean tryReleaseShared(int arg) {               setState(1);//打开闭锁            return true;//其他线程可以获取该闭锁        }    }    public static void main(String[] args) {           OneShotLatch osl = new OneShotLatch();        new Thread(() -> {               System.out.println("we are in main 01 thread, and start osl await");            try {                   osl.await();            } catch (InterruptedException e) {                   e.printStackTrace();            }            System.out.println("main 01 thread osl await finished");        }).start();        new Thread(() -> {               System.out.println("we are in main 02 thread, and start osl await");            try {                   osl.await();            } catch (InterruptedException e) {                   e.printStackTrace();            }            System.out.println("main 02 thread osl await finished");        }).start();        new Thread(() -> {               System.out.println("we are in main 03 thread, and first sleep 5s");            try {                   Thread.sleep(5000);                System.out.println("we are in main 03 thread, and start osl await");                osl.await();            } catch (InterruptedException e) {                   e.printStackTrace();            }            System.out.println("main 03 thread osl await finished");        }).start();        new Thread(() -> {               System.out.println("we are in work thread,and we start waiting");            try {                   Thread.sleep(3000);            } catch (InterruptedException e) {                   e.printStackTrace();            }            System.out.println("work finish,now we signal main thread");            osl.signal();        }).start();    }}

java.util.concurrent中的 AQS

并发包中有许多的可阻塞类,例如ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask等,都是基于 AQS 构建的。

ReentrantLock

ReentrantLock只支持独占方式的获取操作,因此它实现了 tryAcquiretryReleaseisHeldExclusively方法。

  • ReentrantLock将同步状态state用于保存锁获取操作的次数。
  • 维护了一个 owner 变量来保存当前线程,但是在1.6上进行了重构增加了AbstractOwnableSynchronizerexclusiveOwnerThread来保存当前线程。只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量。
    • tryRelease 中检查 owner 域,从而确保当前线程在执行 unlock 操作前已经获取了锁
    • tryAcquire 中将使用 owner 域判断获取操作是重入还是竞争的

非公平锁版本(默认)

static final class NonfairSync extends Sync {           private static final long serialVersionUID = 7316153563782823691L;        /**         * Performs lock.  Try immediate barge, backing up to normal         * acquire on failure.         */        final void lock() {               if (compareAndSetState(0, 1))                setExclusiveOwnerThread(Thread.currentThread());            else                acquire(1);        }        protected final boolean tryAcquire(int acquires) {               return nonfairTryAcquire(acquires);        }    }
final boolean nonfairTryAcquire(int acquires) {               final Thread current = Thread.currentThread();            int c = getState();            if (c == 0) {                   if (compareAndSetState(0, acquires)) {                       setExclusiveOwnerThread(current);                    return true;                }            }            else if (current == getExclusiveOwnerThread()) {                   int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }            return false;        }

当一个线程尝试获取锁时,tryAcquire 将首先检查锁的状态:

  • 未被持有:通过 compareAndSetState(0, acquires)原子性的操作尝试更新锁的状态以表示已经被持有。
  • 已经持有:判断当前现场是否为锁的拥有者,是:计数递增(所以 ReentrantLock 是可重入锁);不是:获取操作失败。

SemaphoreCountDownLatch

SemaphoreCountDownLatch是属于支持共享获取的同步器,因此它们实现了 tryAcquireSharedtryReleaseShared 方法

Semaphorestate 用于保存当前可用许可数量。

Semaphore的非公平锁实现为例:

static final class NonfairSync extends Sync {           private static final long serialVersionUID = -2694183684443567898L;        NonfairSync(int permits) {               super(permits);        }        protected int tryAcquireShared(int acquires) {               return nonfairTryAcquireShared(acquires);        }    }        final int nonfairTryAcquireShared(int acquires) {               for (;;) {                   int available = getState();                int remaining = available - acquires;                //这个代码很精髓啊                //remaining < 0 :如果没有足够的许可,退出循环                //compareAndSetState设置成功,退出循环;设置失败,重新尝试                if (remaining < 0 ||                    compareAndSetState(available, remaining))                    return remaining;            }        }        protected final boolean tryReleaseShared(int releases) {               for (;;) {                   int current = getState();                int next = current + releases;                if (next < current) // overflow                    throw new Error("Maximum permit count exceeded");                if (compareAndSetState(current, next))                    return true;            }        }

tryAcquireShared首先会计算剩余许可的数量。

  • 如果数量不足,那么会返回一个值表示获取操作失败。
  • 如果还有剩余的许可数量,会通过compareAndSetState以原子的方式来降低许可的计数。如果这个操作成功(意味着从上次读取后就没有被修改过),那么就返回一个值表示操作获取成功。

CountDownLatch使用 AQS 的方式与Semaphore很相似:同步状态 state用来保存当前的计数值

await() 调用关系:

graph LRawait-->acquireSharedInterruptibly acquireSharedInterruptibly --> tryReleaseShared

await 调用 acquire,当计数器为0时,acquire 将立即返回,否则将执行doAcquireSharedInterruptibly进入阻塞。

private static final class Sync extends AbstractQueuedSynchronizer {           Sync(int count) {               setState(count);        }        int getCount() {               return getState();        }        protected int tryAcquireShared(int acquires) {               return (getState() == 0) ? 1 : -1;        }        protected boolean tryReleaseShared(int releases) {               // Decrement count; signal when transition to zero            for (;;) {                   int c = getState();                if (c == 0)                    return false;                int nextc = c-1;                if (compareAndSetState(c, nextc))                    return nextc == 0;            }        }    }    public final void acquireSharedInterruptibly(int arg)            throws InterruptedException {           if (Thread.interrupted())            throw new InterruptedException();        if (tryAcquireShared(arg) < 0)            doAcquireSharedInterruptibly(arg);    }    public void await() throws InterruptedException {           sync.acquireSharedInterruptibly(1);    }

countDown()调用关系:

graph LRcountDown-->releaseSharedreleaseShared-->tryReleaseShared

countDown() 调用 tryReleaseShared来完成计数递减,当计数值为0时,执行doReleaseShared解除所有等待线程的阻塞。

public void countDown() {           sync.releaseShared(1);    }    public final boolean releaseShared(int arg) {           if (tryReleaseShared(arg)) {               doReleaseShared();            return true;        }        return false;    }

FutureTask

老爷子新版本并没有直接使用 AQS,通过sun.misc.Unsafe UNSAFE 来操作实现。

ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:读取锁和写入锁,但在基于 AQS 实现的 ReentrantReadWriteLock 中,单个AQS子类将同时管理读取加锁和写入加锁。

ReentrantReadWriteLock使用了两个16位的状态分别表示写入锁和读取锁的计数。在读取锁上的操作使用共享的获取、释放方式;在写入锁上的操作使用独占的获取、释放方式。

AQS 在内部维护一个等待线程队列,其中记录了某个线程是独占(Node.EXCLUSIVE)还是共享(Node.SHARE)访问。

小结

AQS 源码真心复杂,本篇只是粗浅的记录下并发包内的 AQS 的使用情况,下一篇争取啃下 AQS 的实现原理

Doug Lea老爷子一个人撸起了 java 并发的大旗,真滴猛。

以下是是网上比较好的 AQS 源码解析,记录一下

上一篇:JAVA多线程第三部分(三)原子变量和非阻塞同步机制
下一篇:JAVA多线程第三部分(一)显式锁与synchronized

发表评论

最新留言

表示我来过!
[***.240.166.169]2025年04月01日 21时40分41秒